diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MutableRegionInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MutableRegionInfo.java index a9382f3a9bed..81e6d478b79d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MutableRegionInfo.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MutableRegionInfo.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hbase.thirdparty.com.google.gson.annotations.Expose; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,17 +55,17 @@ class MutableRegionInfo implements RegionInfo { // zookeeper as of 0.90.0 HBase. And now in DisableTableProcedure, finally we will create bunch // of UnassignProcedures and at the last of the procedure we will set the region state to // CLOSED, and will not change the offLine flag. - private boolean offLine; - private boolean split; - private final long regionId; - private final int replicaId; - private final byte[] regionName; - private final byte[] startKey; - private final byte[] endKey; - private final int hashCode; - private final String encodedName; - private final byte[] encodedNameAsBytes; - private final TableName tableName; + @Expose private boolean offLine; + @Expose private boolean split; + @Expose private final long regionId; + @Expose private final int replicaId; + @Expose private final byte[] regionName; + @Expose private final byte[] startKey; + @Expose private final byte[] endKey; + @Expose private final int hashCode; + @Expose private final String encodedName; + @Expose private final byte[] encodedNameAsBytes; + @Expose private final TableName tableName; private static int generateHashCode(final TableName tableName, final byte[] startKey, final byte[] endKey, final long regionId, final int replicaId, boolean offLine, diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ServerName.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ServerName.java index 5223bac3e5b1..9f32e64b9ff4 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ServerName.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ServerName.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hbase.thirdparty.com.google.gson.annotations.Expose; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.base.Splitter; @@ -82,15 +83,15 @@ public class ServerName implements Comparable, Serializable { */ public static final String UNKNOWN_SERVERNAME = "#unknown#"; - private final String serverName; - private final long startCode; + @Expose private final String serverName; + @Expose private final long startCode; private transient Address address; /** * Cached versioned bytes of this ServerName instance. * @see #getVersionedBytes() */ - private byte[] bytes; + @Expose private byte[] bytes; public static final List EMPTY_SERVER_LIST = new ArrayList<>(0); /** diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java index c799fb9b2f78..0b077b724786 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java @@ -24,6 +24,7 @@ import java.util.concurrent.CopyOnWriteArraySet; import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hbase.thirdparty.com.google.gson.annotations.Expose; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; @@ -93,14 +94,14 @@ public static boolean isMetaTableName(final TableName tn) { */ public static final TableName OLD_META_TABLE_NAME = getADummyTableName(OLD_META_STR); - private final byte[] name; - private final String nameAsString; - private final byte[] namespace; - private final String namespaceAsString; - private final byte[] qualifier; - private final String qualifierAsString; - private final boolean systemTable; - private final int hashCode; + @Expose private final byte[] name; + @Expose private final String nameAsString; + @Expose private final byte[] namespace; + @Expose private final String namespaceAsString; + @Expose private final byte[] qualifier; + @Expose private final String qualifierAsString; + @Expose private final boolean systemTable; + @Expose private final int hashCode; /** * Check passed byte array, "tableName", is legal user-space table name. diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/net/Address.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/net/Address.java index ef3520b31c78..5b35bfbd0edb 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/net/Address.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/net/Address.java @@ -21,6 +21,7 @@ import java.util.Iterator; import java.util.List; import org.apache.commons.lang3.StringUtils; +import org.apache.hbase.thirdparty.com.google.gson.annotations.Expose; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.base.Splitter; @@ -37,7 +38,7 @@ */ @InterfaceAudience.Public public class Address implements Comparable
{ - private final HostAndPort hostAndPort; + @Expose private final HostAndPort hostAndPort; private Address(HostAndPort hostAndPort) { this.hostAndPort = hostAndPort; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/hubspot/HubSpotCellUtilities.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/hubspot/HubSpotCellUtilities.java new file mode 100644 index 000000000000..faa484b5e449 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/hubspot/HubSpotCellUtilities.java @@ -0,0 +1,340 @@ +package org.apache.hadoop.hbase.hubspot; + +import org.agrona.collections.Int2IntCounterMap; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; +import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; +import org.apache.hbase.thirdparty.com.google.common.primitives.Shorts; +import org.apache.hbase.thirdparty.com.google.gson.ExclusionStrategy; +import org.apache.hbase.thirdparty.com.google.gson.FieldAttributes; +import org.apache.hbase.thirdparty.com.google.gson.Gson; +import org.apache.hbase.thirdparty.com.google.gson.GsonBuilder; +import org.apache.hbase.thirdparty.com.google.gson.JsonArray; +import org.apache.hbase.thirdparty.com.google.gson.JsonDeserializationContext; +import org.apache.hbase.thirdparty.com.google.gson.JsonDeserializer; +import org.apache.hbase.thirdparty.com.google.gson.JsonElement; +import org.apache.hbase.thirdparty.com.google.gson.JsonObject; +import org.apache.hbase.thirdparty.com.google.gson.JsonParseException; +import org.apache.hbase.thirdparty.com.google.gson.JsonSerializationContext; +import org.apache.hbase.thirdparty.com.google.gson.JsonSerializer; +import org.apache.yetus.audience.InterfaceAudience; +import java.lang.reflect.Field; +import java.lang.reflect.Type; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +@InterfaceAudience.Private +public final class HubSpotCellUtilities { + // TODO: this should be dynamically configured, not hard-coded, but this dramatically simplifies the initial version + public static final short MAX_CELL_COUNT = 360; + private static final int TARGET_MAX_CELLS_PER_RS = 72; + + public static final Gson OBJECT_MAPPER = new GsonBuilder() + .excludeFieldsWithoutExposeAnnotation() + .enableComplexMapKeySerialization() + .registerTypeAdapter(Int2IntCounterMap.class, new Int2IntCounterMapAdapter()) + .registerTypeAdapter(RegionInfo.class, (JsonDeserializer) (json, typeOfT, context) -> { + JsonObject obj = json.getAsJsonObject(); + + boolean split = obj.get("split").getAsBoolean(); + long regionId = obj.get("regionId").getAsLong(); + int replicaId = obj.get("replicaId").getAsInt(); + JsonObject tableName = obj.get("tableName").getAsJsonObject(); + JsonArray startKey = obj.get("startKey").getAsJsonArray(); + JsonArray endKey = obj.get("endKey").getAsJsonArray(); + + byte[] startKeyBytes = new byte[startKey.size()]; + byte[] endKeyBytes = new byte[endKey.size()]; + + for (int i = 0; i < startKey.size(); i++) { + startKeyBytes[i] = startKey.get(i).getAsByte(); + } + for (int i = 0; i < endKey.size(); i++) { + endKeyBytes[i] = endKey.get(i).getAsByte(); + } + + TableName tb = TableName.valueOf( + tableName.get("namespaceAsString").getAsString(), + tableName.get("qualifierAsString").getAsString() + ); + + RegionInfo result = + RegionInfoBuilder.newBuilder(tb).setSplit(split).setRegionId(regionId) + .setReplicaId(replicaId).setStartKey(startKeyBytes).setEndKey(endKeyBytes).build(); + return result; + }) + .addDeserializationExclusionStrategy(new ExclusionStrategy() { + @Override public boolean shouldSkipField(FieldAttributes f) { + return f.getName().equals("serversToIndex") + || f.getName().equals("regionsToIndex") + || f.getName().equals("clusterState") + ; + } + + @Override public boolean shouldSkipClass(Class clazz) { + return false; + } + }) + .create(); + + public static final ImmutableSet CELL_AWARE_TABLES = ImmutableSet.of("objects-3"); + + private HubSpotCellUtilities() {} + + public static int getMaxCellsPerRs(int servers) { + return Math.max( + TARGET_MAX_CELLS_PER_RS, + Ints.checkedCast( (long)Math.floor((double) MAX_CELL_COUNT / servers)) + ); + } + + public static String toCellSetString(Set cells) { + return cells.stream().sorted().map(x -> Short.toString(x)).collect(Collectors.joining(", ", "{", "}")); + } + + public static boolean isStopInclusive(byte[] endKey) { + return (endKey == null || endKey.length != 2) && (endKey == null || endKey.length <= 2 + || !areSubsequentBytesAllZero(endKey, 2)); + } + + public static short calcNumCells(RegionInfo[] regionInfos, short totalCellCount) { + if (regionInfos == null || regionInfos.length == 0) { + return 0; + } + + Set cellsInRegions = Arrays.stream(regionInfos) + .map(region -> toCells(region.getStartKey(), region.getEndKey(), totalCellCount)) + .flatMap(Set::stream).collect(Collectors.toSet()); + return Shorts.checkedCast(cellsInRegions.size()); + } + + public static Set toCells(byte[] rawStart, byte[] rawStop, short numCells) { + return range(padToTwoBytes(rawStart, (byte) 0), padToTwoBytes(rawStop, (byte) -1), numCells); + } + + public static byte[] padToTwoBytes(byte[] key, byte pad) { + if (key == null || key.length == 0) { + return new byte[] { pad, pad }; + } + + if (key.length == 1) { + return new byte[] { pad, key[0] }; + } + + return key; + } + + public static Set range(byte[] start, byte[] stop) { + return range(start, stop, MAX_CELL_COUNT); + } + + public static Set range(byte[] start, byte[] stop, short numCells) { + short stopCellId = toCell(stop, (byte) -1, (short) (numCells - 1)); + if (stopCellId < 0 || stopCellId > numCells) { + stopCellId = numCells; + } + short startCellId = toCell(start, (byte) 0, (short) 0); + + if (startCellId == stopCellId) { + return ImmutableSet.of(startCellId); + } + + boolean isStopExclusive = areSubsequentBytesAllZero(stop, 2); + + final IntStream cellStream; + if (isStopExclusive) { + cellStream = IntStream.range(startCellId, stopCellId); + } else { + int stopCellIdForcedToIncludeStart = Math.max(stopCellId, startCellId + 1); + cellStream = IntStream.rangeClosed(startCellId, stopCellIdForcedToIncludeStart); + } + + return cellStream.mapToObj(val -> (short) val).collect(Collectors.toSet()); + } + + private static boolean areSubsequentBytesAllZero(byte[] stop, int offset) { + for (int i = offset; i < stop.length; i++) { + if (stop[i] != (byte) 0) { + return false; + } + } + return true; + } + + private static short toCell(byte[] key, byte pad, short ifAbsent) { + if (key == null) { + throw new IllegalArgumentException( + "Key must be nonnull"); + } + + return key.length == 0 + ? ifAbsent + : (key.length >= 2 + ? Bytes.toShort(key, 0, 2) + : Bytes.toShort(new byte[] { pad, key[0] })); + } + + public static List computeCellsPerRs(int[][] regionsPerServer, RegionInfo[] regions) { + return Arrays.stream(regionsPerServer) + .map(regionsOnServer -> { + return (int) Arrays.stream(regionsOnServer) + .boxed() + .map(i -> regions[i]) + .flatMap(region -> toCells(region.getStartKey(), region.getEndKey(), MAX_CELL_COUNT).stream()) + .distinct() + .count(); + }) + .collect(Collectors.toList()); + } + + public static Int2IntCounterMap computeRegionsPerCell(RegionInfo[] regions) { + Int2IntCounterMap result = new Int2IntCounterMap(MAX_CELL_COUNT, 0.6f, 0); + for (RegionInfo region : regions) { + toCells(region.getStartKey(), region.getEndKey(), MAX_CELL_COUNT).forEach(cell -> result.incrementAndGet((int) cell)); + } + return result; + } + + public static Int2IntCounterMap computeRegionServersPerCell(int[][] regionsPerServer, RegionInfo[] regions) { + Int2IntCounterMap result = new Int2IntCounterMap(MAX_CELL_COUNT, 0.6f, 0); + + for (int[] regionsOnServer : regionsPerServer) { + Set cellsOnRegionServer = new HashSet<>(); + for (int region : regionsOnServer) { + Set cellsForRegion = + toCells(regions[region].getStartKey(), regions[region].getEndKey(), MAX_CELL_COUNT); + cellsOnRegionServer.addAll(cellsForRegion); + } + cellsOnRegionServer.forEach(cell -> result.incrementAndGet((int) cell)); + } + + return result; + } + + public static List computeMaxShareOfCellPerRs(int[][] regionsPerServer, RegionInfo[] regions) { + Int2IntCounterMap numRegionsPerCell = new Int2IntCounterMap(0, 0.6f, 0); + Int2IntCounterMap numRegionServersPerCell = new Int2IntCounterMap(0, 0.6f, 0); + for (int[] regionsOnServer : regionsPerServer) { + Set cellsOnRegionServer = new HashSet<>(); + for (int region : regionsOnServer) { + Set cellsForRegion = + toCells(regions[region].getStartKey(), regions[region].getEndKey(), MAX_CELL_COUNT); + cellsOnRegionServer.addAll(cellsForRegion); + cellsForRegion.forEach(cell -> numRegionsPerCell.incrementAndGet((int) cell)); + } + cellsOnRegionServer.forEach(cell -> numRegionServersPerCell.incrementAndGet((int) cell)); + } + + return Arrays.stream(regionsPerServer) + .map(regionsOnServer -> { + Map> countOfRegionsbyCell = + Arrays.stream(regionsOnServer).boxed().map(i -> regions[i]).flatMap( + region -> toCells(region.getStartKey(), region.getEndKey(), MAX_CELL_COUNT).stream()) + .collect(Collectors.groupingBy(cell -> cell)); + + List sharesOfCellOnServer = countOfRegionsbyCell.keySet() + .stream() + .map(cell -> { + int numRegionsForCellOnServer = countOfRegionsbyCell.get(cell).size(); + int numRegionsForCell = numRegionsPerCell.get((int) cell); + double percent = + (double) numRegionsForCellOnServer / numRegionsForCell; + return percent; + }).collect(Collectors.toList()); + + return sharesOfCellOnServer.stream() + .mapToDouble(x -> x) + .max() + .orElse(0.0); + }) + .collect(Collectors.toList()); + } + + static class Int2IntCounterMapAdapter implements JsonSerializer, + JsonDeserializer { + @Override public JsonElement serialize(Int2IntCounterMap src, Type typeOfSrc, + JsonSerializationContext context) { + JsonObject obj = new JsonObject(); + + obj.addProperty("loadFactor", src.loadFactor()); + obj.addProperty("initialValue", src.initialValue()); + obj.addProperty("resizeThreshold", src.resizeThreshold()); + obj.addProperty("size", src.size()); + + Field entryField = null; + try { + entryField = Int2IntCounterMap.class.getDeclaredField("entries"); + } catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } + entryField.setAccessible(true); + int[] entries = null; + try { + entries = (int[]) entryField.get(src); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + JsonArray entryArray = new JsonArray(entries.length); + for (int entry : entries) { + entryArray.add(entry); + } + obj.add("entries", entryArray); + + return obj; + } + + @Override public Int2IntCounterMap deserialize(JsonElement json, Type typeOfT, + JsonDeserializationContext context) throws JsonParseException { + JsonObject obj = json.getAsJsonObject(); + + float loadFactor = obj.get("loadFactor").getAsFloat(); + int initialValue = obj.get("initialValue").getAsInt(); + int resizeThreshold = obj.get("resizeThreshold").getAsInt(); + int size = obj.get("size").getAsInt(); + + JsonArray entryArray = obj.get("entries").getAsJsonArray(); + int[] entries = new int[entryArray.size()]; + + for (int i = 0; i < entryArray.size(); i++) { + entries[i] = entryArray.get(i).getAsInt(); + } + + Int2IntCounterMap result = new Int2IntCounterMap(0, loadFactor, initialValue); + + Field resizeThresholdField = null; + Field entryField = null; + Field sizeField = null; + + try { + resizeThresholdField = Int2IntCounterMap.class.getDeclaredField("resizeThreshold"); + entryField = Int2IntCounterMap.class.getDeclaredField("entries"); + sizeField = Int2IntCounterMap.class.getDeclaredField("size"); + } catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } + + resizeThresholdField.setAccessible(true); + entryField.setAccessible(true); + sizeField.setAccessible(true); + + try { + resizeThresholdField.set(result, resizeThreshold); + entryField.set(result, entries); + sizeField.set(result, size); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + + return result; + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java index 4a9bdfee708a..de1cb5793017 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.net.Address; +import org.apache.hbase.thirdparty.com.google.gson.annotations.Expose; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,68 +53,68 @@ class BalancerClusterState { private static final Logger LOG = LoggerFactory.getLogger(BalancerClusterState.class); - ServerName[] servers; + @Expose ServerName[] servers; // ServerName uniquely identifies a region server. multiple RS can run on the same host - String[] hosts; - String[] racks; - boolean multiServersPerHost = false; // whether or not any host has more than one server + @Expose String[] hosts; + @Expose String[] racks; + @Expose boolean multiServersPerHost = false; // whether or not any host has more than one server - ArrayList tables; - RegionInfo[] regions; - Deque[] regionLoads; + @Expose ArrayList tables; + @Expose RegionInfo[] regions; + @Expose Deque[] regionLoads; private RegionLocationFinder regionFinder; - int[][] regionLocations; // regionIndex -> list of serverIndex sorted by locality + @Expose int[][] regionLocations; // regionIndex -> list of serverIndex sorted by locality - int[] serverIndexToHostIndex; // serverIndex -> host index - int[] serverIndexToRackIndex; // serverIndex -> rack index + @Expose int[] serverIndexToHostIndex; // serverIndex -> host index + @Expose int[] serverIndexToRackIndex; // serverIndex -> rack index - int[][] regionsPerServer; // serverIndex -> region list - int[] serverIndexToRegionsOffset; // serverIndex -> offset of region list - int[][] regionsPerHost; // hostIndex -> list of regions - int[][] regionsPerRack; // rackIndex -> region list - Int2IntCounterMap[] colocatedReplicaCountsPerServer; // serverIndex -> counts of colocated + @Expose int[][] regionsPerServer; // serverIndex -> region list + @Expose int[] serverIndexToRegionsOffset; // serverIndex -> offset of region list + @Expose int[][] regionsPerHost; // hostIndex -> list of regions + @Expose int[][] regionsPerRack; // rackIndex -> region list + @Expose Int2IntCounterMap[] colocatedReplicaCountsPerServer; // serverIndex -> counts of colocated // replicas by primary region index - Int2IntCounterMap[] colocatedReplicaCountsPerHost; // hostIndex -> counts of colocated replicas by + @Expose Int2IntCounterMap[] colocatedReplicaCountsPerHost; // hostIndex -> counts of colocated replicas by // primary region index - Int2IntCounterMap[] colocatedReplicaCountsPerRack; // rackIndex -> counts of colocated replicas by + @Expose Int2IntCounterMap[] colocatedReplicaCountsPerRack; // rackIndex -> counts of colocated replicas by // primary region index - int[][] serversPerHost; // hostIndex -> list of server indexes - int[][] serversPerRack; // rackIndex -> list of server indexes - int[] regionIndexToServerIndex; // regionIndex -> serverIndex - int[] initialRegionIndexToServerIndex; // regionIndex -> serverIndex (initial cluster state) - int[] regionIndexToTableIndex; // regionIndex -> tableIndex - int[][] numRegionsPerServerPerTable; // tableIndex -> serverIndex -> # regions - int[] numRegionsPerTable; // tableIndex -> region count - int[] numMaxRegionsPerTable; // tableIndex -> max number of regions in a single RS - int[] regionIndexToPrimaryIndex; // regionIndex -> regionIndex of the primary - boolean hasRegionReplicas = false; // whether there is regions with replicas - - Integer[] serverIndicesSortedByRegionCount; - Integer[] serverIndicesSortedByLocality; - - Map serversToIndex; - Map hostsToIndex; - Map racksToIndex; - Map tablesToIndex; - Map regionsToIndex; - float[] localityPerServer; - - int numServers; - int numHosts; - int numRacks; - int numTables; - int numRegions; - - int numMovedRegions = 0; // num moved regions from the initial configuration - Map> clusterState; + @Expose int[][] serversPerHost; // hostIndex -> list of server indexes + @Expose int[][] serversPerRack; // rackIndex -> list of server indexes + @Expose int[] regionIndexToServerIndex; // regionIndex -> serverIndex + @Expose int[] initialRegionIndexToServerIndex; // regionIndex -> serverIndex (initial cluster state) + @Expose int[] regionIndexToTableIndex; // regionIndex -> tableIndex + @Expose int[][] numRegionsPerServerPerTable; // tableIndex -> serverIndex -> # regions + @Expose int[] numRegionsPerTable; // tableIndex -> region count + @Expose int[] numMaxRegionsPerTable; // tableIndex -> max number of regions in a single RS + @Expose int[] regionIndexToPrimaryIndex; // regionIndex -> regionIndex of the primary + @Expose boolean hasRegionReplicas = false; // whether there is regions with replicas + + @Expose Integer[] serverIndicesSortedByRegionCount; + @Expose Integer[] serverIndicesSortedByLocality; + + @Expose Map serversToIndex; + @Expose Map hostsToIndex; + @Expose Map racksToIndex; + @Expose Map tablesToIndex; + @Expose Map regionsToIndex; + @Expose float[] localityPerServer; + + @Expose int numServers; + @Expose int numHosts; + @Expose int numRacks; + @Expose int numTables; + @Expose int numRegions; + + @Expose int numMovedRegions = 0; // num moved regions from the initial configuration + @Expose Map> clusterState; private final RackManager rackManager; // Maps region -> rackIndex -> locality of region on rack - private float[][] rackLocalities; + @Expose private float[][] rackLocalities; // Maps localityType -> region -> [server|rack]Index with highest locality - private int[][] regionsToMostLocalEntities; + @Expose private int[][] regionsToMostLocalEntities; static class DefaultRackManager extends RackManager { @Override @@ -407,6 +408,7 @@ private void registerRegion(RegionInfo region, int regionIndex, int serverIndex, if (regionFinder != null) { // region location List loc = regionFinder.getTopBlockLocations(region); + LOG.debug("{} is located on {} server", region.getRegionNameAsString(), loc.size()); regionLocations[regionIndex] = new int[loc.size()]; for (int i = 0; i < loc.size(); i++) { regionLocations[regionIndex][i] = loc.get(i) == null @@ -415,6 +417,8 @@ private void registerRegion(RegionInfo region, int regionIndex, int serverIndex, ? -1 : serversToIndex.get(loc.get(i).getAddress())); } + } else { + LOG.warn("Region finder is null, not registering region {}", region.getRegionNameAsString()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/PrefixCandidateGenerator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/PrefixCandidateGenerator.java new file mode 100644 index 000000000000..c92c3c9b1f64 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/PrefixCandidateGenerator.java @@ -0,0 +1,508 @@ +package org.apache.hadoop.hbase.master.balancer; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.hubspot.HubSpotCellUtilities; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMultimap; +import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; +import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; + +@InterfaceAudience.Private abstract class PrefixCandidateGenerator extends CandidateGenerator { + private static final Logger LOG = LoggerFactory.getLogger(PrefixCandidateGenerator.class); + protected static final int NO_REGION = -1; + static final boolean IS_DEBUG = false; + + enum ComparisonMode { + STRICT, ALLOW_OFF_BY_ONE + } + + protected BalanceAction tryMoveRegionFromSomeOverloadedServer(BalancerClusterState cluster, + int[] cellCounts, List> cellGroupSizesPerServer, + int targetRegionsPerServer) { + Optional fromServerMaybe = + pickOverloadedServer(cluster, targetRegionsPerServer, ComparisonMode.ALLOW_OFF_BY_ONE); + if (!fromServerMaybe.isPresent()) { + return BalanceAction.NULL_ACTION; + } + + int fromServer = fromServerMaybe.get(); + Optional toServerMaybe = + pickUnderloadedServer(cluster, targetRegionsPerServer, ComparisonMode.ALLOW_OFF_BY_ONE); + if (!toServerMaybe.isPresent()) { + return BalanceAction.NULL_ACTION; + } + int toServer = toServerMaybe.get(); + short cell = pickMostFrequentCell(cluster, cellCounts, cellGroupSizesPerServer.get(fromServer)); + + return moveCell("evacuate overloaded - target = " + targetRegionsPerServer, fromServer, cell, + toServer, cellGroupSizesPerServer, cluster); + } + + protected BalanceAction swapRegionsToDecreaseDistinctCellsPerServer(BalancerClusterState cluster, + int[] cellCounts, List> cellGroupSizesPerServer, int targetCellsPerServer) { + Optional fromServerMaybe = + pickServerWithTooManyCells(cluster, cellGroupSizesPerServer, targetCellsPerServer); + if (!fromServerMaybe.isPresent()) { + return BalanceAction.NULL_ACTION; + } + int fromServer = fromServerMaybe.get(); + short fromCell = + pickLeastFrequentCell(cluster, cellCounts, cellGroupSizesPerServer.get(fromServer)); + + Optional> toCellMaybe = + pickCellOnServerPresentOnSource(cluster, cellCounts, cellGroupSizesPerServer, fromServer, + fromCell); + if (!toCellMaybe.isPresent()) { + return BalanceAction.NULL_ACTION; + } + + short toCell = toCellMaybe.get().getFirst(); + int toServer = toCellMaybe.get().getSecond(); + + return swapCells("swap to decrease", fromServer, fromCell, toServer, toCell, + cellGroupSizesPerServer, cluster); + } + + protected Optional> pickCellOnServerPresentOnSource( + BalancerClusterState cluster, int[] cellCounts, + List> cellGroupSizesPerServer, int fromServer, short cell) { + Map countsForFromServer = cellGroupSizesPerServer.get(fromServer); + Optional> result = Optional.empty(); + + // randomly select one using a simplified inline reservoir sample + // See: http://gregable.com/2007/10/reservoir-sampling.html + double reservoirRandom = -1; + for (int server = 0; server < cluster.numServers; server++) { + if (server == fromServer) { + continue; + } + + Map countsForToCandidate = cellGroupSizesPerServer.get(server); + Set candidateCellsOnTo = new HashSet<>(); + for (short cellOnTo : countsForToCandidate.keySet()) { + if (cellOnTo != cell && countsForFromServer.containsKey(cellOnTo)) { + candidateCellsOnTo.add(cellOnTo); + } + } + + if (!candidateCellsOnTo.isEmpty()) { + double candidateRandom = ThreadLocalRandom.current().nextDouble(); + if (candidateRandom > reservoirRandom) { + reservoirRandom = candidateRandom; + result = Optional.of(Pair.newPair(candidateCellsOnTo.stream().findAny().get(), server)); + } + } + } + + return result; + } + + protected Optional pickServerWithTooManyCells(BalancerClusterState cluster, + List> cellGroupSizesPerServer, int targetCellsPerServer) { + // randomly select one using a simplified inline reservoir sample + // See: http://gregable.com/2007/10/reservoir-sampling.html + Optional result = Optional.empty(); + int highestSoFar = Integer.MIN_VALUE; + double reservoirRandom = -1; + + for (int server = 0; server < cluster.numServers; server++) { + int numCellsOnServer = cellGroupSizesPerServer.get(server).keySet().size(); + if (numCellsOnServer > targetCellsPerServer) { + if (numCellsOnServer > highestSoFar) { + highestSoFar = numCellsOnServer; + reservoirRandom = ThreadLocalRandom.current().nextDouble(); + result = Optional.of(server); + } else if (numCellsOnServer == highestSoFar) { + double candidateRandom = ThreadLocalRandom.current().nextDouble(); + if (candidateRandom > reservoirRandom) { + reservoirRandom = candidateRandom; + result = Optional.of(server); + } + } + } + } + + return result; + } + + protected BalanceAction tryMoveRegionToSomeUnderloadedServer(BalancerClusterState cluster, + int[] cellCounts, List> cellGroupSizesPerServer, + int targetRegionsPerServer) { + Optional toServerMaybe = + pickUnderloadedServer(cluster, targetRegionsPerServer, ComparisonMode.STRICT); + if (!toServerMaybe.isPresent()) { + return BalanceAction.NULL_ACTION; + } + + int toServer = toServerMaybe.get(); + Optional fromServerMaybe = + pickOverloadedServer(cluster, targetRegionsPerServer, ComparisonMode.STRICT); + if (!fromServerMaybe.isPresent()) { + return BalanceAction.NULL_ACTION; + } + int fromServer = fromServerMaybe.get(); + short cell = pickMostFrequentCell(cluster, cellCounts, cellGroupSizesPerServer.get(fromServer)); + + return moveCell("fill underloaded - target = " + targetRegionsPerServer, fromServer, cell, + toServer, cellGroupSizesPerServer, cluster); + } + + protected Optional pickOverloadedServer(BalancerClusterState cluster, + int targetRegionsPerServer, ComparisonMode mode) { + int[][] regionsPerServer = cluster.regionsPerServer; + Optional pickedServer = Optional.empty(); + int mostRegionsPerServerSoFar = Integer.MIN_VALUE; + double reservoirRandom = -1; + int target = targetRegionsPerServer + (mode == ComparisonMode.STRICT ? 0 : 1); + + for (int server = 0; server < cluster.numServers; server++) { + int[] regions = regionsPerServer[server]; + int numRegionsOnServer = regions.length; + if (numRegionsOnServer > target) { + double candidateRandom = ThreadLocalRandom.current().nextDouble(); + if (numRegionsOnServer > mostRegionsPerServerSoFar) { + pickedServer = Optional.of(server); + reservoirRandom = candidateRandom; + mostRegionsPerServerSoFar = numRegionsOnServer; + } else if (numRegionsOnServer == mostRegionsPerServerSoFar + && candidateRandom > reservoirRandom) { + pickedServer = Optional.of(server); + reservoirRandom = candidateRandom; + } + } + } + + return pickedServer; + } + + protected Optional pickUnderloadedServer(BalancerClusterState cluster, + int targetRegionsPerServer, ComparisonMode mode) { + Optional pickedServer = Optional.empty(); + double reservoirRandom = -1; + int target = targetRegionsPerServer + (mode == ComparisonMode.STRICT ? 0 : 1); + + for (int server = 0; server < cluster.numServers; server++) { + if (cluster.regionsPerServer[server].length < target) { + double candidateRandom = ThreadLocalRandom.current().nextDouble(); + if (!pickedServer.isPresent()) { + pickedServer = Optional.of(server); + reservoirRandom = candidateRandom; + } else if (candidateRandom > reservoirRandom) { + pickedServer = Optional.of(server); + reservoirRandom = candidateRandom; + } + } + } + + return pickedServer; + } + + protected BalanceAction swapRegionsToIncreaseDistinctCellsPerServer(BalancerClusterState cluster, + int[] cellCounts, List> cellGroupSizesPerServer, int targetCellsPerServer) { + Optional fromServerMaybe = + pickServerWithoutEnoughIsolation(cluster, cellGroupSizesPerServer, targetCellsPerServer); + if (!fromServerMaybe.isPresent()) { + return BalanceAction.NULL_ACTION; + } + int fromServer = fromServerMaybe.get(); + short fromCell = + pickMostFrequentCell(cluster, cellCounts, cellGroupSizesPerServer.get(fromServer)); + + Optional> toCellMaybe = + pickCellOnServerNotPresentOnSource(cluster, cellCounts, cellGroupSizesPerServer, fromServer, + fromCell); + if (!toCellMaybe.isPresent()) { + return BalanceAction.NULL_ACTION; + } + + short toCell = toCellMaybe.get().getFirst(); + int toServer = toCellMaybe.get().getSecond(); + + return swapCells("swap to increase", fromServer, fromCell, toServer, toCell, + cellGroupSizesPerServer, cluster); + } + + protected Optional> pickCellOnServerNotPresentOnSource( + BalancerClusterState cluster, int[] cellCounts, + List> cellGroupSizesPerServer, int fromServer, short cell) { + Map countsForFromServer = cellGroupSizesPerServer.get(fromServer); + Optional> result = Optional.empty(); + + // randomly select one using a simplified inline reservoir sample + // See: http://gregable.com/2007/10/reservoir-sampling.html + double reservoirRandom = -1; + for (int server = 0; server < cluster.numServers; server++) { + if (server == fromServer) { + continue; + } + + Map countsForToCandidate = cellGroupSizesPerServer.get(server); + Set candidateCellsOnTo = new HashSet<>(); + for (short cellOnTo : countsForToCandidate.keySet()) { + int regionsForCell = cellCounts[cellOnTo]; + int expectedCountOnAllServers = + Ints.checkedCast((long) Math.floor((double) regionsForCell / cluster.numServers)); + + if (!countsForFromServer.containsKey(cellOnTo) + || countsForFromServer.get(cellOnTo) <= expectedCountOnAllServers) { + candidateCellsOnTo.add(cellOnTo); + } + } + + if (!countsForToCandidate.containsKey(cell) && !candidateCellsOnTo.isEmpty()) { + double candidateRandom = ThreadLocalRandom.current().nextDouble(); + if (candidateRandom > reservoirRandom) { + reservoirRandom = candidateRandom; + result = Optional.of(Pair.newPair(candidateCellsOnTo.stream().findAny().get(), server)); + } + } + } + + return result; + } + + protected Optional pickServerWithoutEnoughIsolation(BalancerClusterState cluster, + List> cellGroupSizesPerServer, int targetCellsPerServer) { + // randomly select one using a simplified inline reservoir sample + // See: http://gregable.com/2007/10/reservoir-sampling.html + Optional result = Optional.empty(); + int lowestSoFar = Integer.MAX_VALUE; + double reservoirRandom = -1; + + for (int server = 0; server < cluster.numServers; server++) { + int numCellsOnServer = cellGroupSizesPerServer.get(server).keySet().size(); + if (numCellsOnServer < targetCellsPerServer) { + if (numCellsOnServer < lowestSoFar) { + lowestSoFar = numCellsOnServer; + reservoirRandom = ThreadLocalRandom.current().nextDouble(); + result = Optional.of(server); + } else if (numCellsOnServer == lowestSoFar) { + double candidateRandom = ThreadLocalRandom.current().nextDouble(); + if (candidateRandom > reservoirRandom) { + reservoirRandom = candidateRandom; + result = Optional.of(server); + } + } + } + } + + return result; + } + + protected short pickMostFrequentCell(BalancerClusterState cluster, int[] cellCounts, + Map cellCountsForServer) { + List cellsOrderedLeastToMostFrequent = + getCellsOrderedLeastToMostFrequent(cluster, cellCounts, cellCountsForServer); + + // randomly select one using a simplified inline reservoir sample + // See: http://gregable.com/2007/10/reservoir-sampling.html + Optional result = + Optional.of(cellsOrderedLeastToMostFrequent.get(cellsOrderedLeastToMostFrequent.size() - 1)); + int highestSoFar = cellCountsForServer.get( + cellsOrderedLeastToMostFrequent.get(cellsOrderedLeastToMostFrequent.size() - 1)); + double reservoirRandom = ThreadLocalRandom.current().nextDouble(); + + for (int cellIndex = cellsOrderedLeastToMostFrequent.size() - 2; cellIndex >= 0; cellIndex--) { + short cell = cellsOrderedLeastToMostFrequent.get(cellIndex); + int numInstancesOfCell = cellCountsForServer.get(cell); + if (numInstancesOfCell < highestSoFar) { + break; + } + + double candidateRandom = ThreadLocalRandom.current().nextDouble(); + if (candidateRandom > reservoirRandom) { + reservoirRandom = candidateRandom; + result = Optional.of(cell); + } + } + + return result.get(); + } + + protected short pickLeastFrequentCell(BalancerClusterState cluster, int[] cellCounts, + Map cellCountsForServer) { + List cellsOrderedLeastToMostFrequent = + getCellsOrderedLeastToMostFrequent(cluster, cellCounts, cellCountsForServer); + + // randomly select one using a simplified inline reservoir sample + // See: http://gregable.com/2007/10/reservoir-sampling.html + Optional result = Optional.of(cellsOrderedLeastToMostFrequent.get(0)); + int lowestSoFar = cellCountsForServer.get(cellsOrderedLeastToMostFrequent.get(0)); + double reservoirRandom = ThreadLocalRandom.current().nextDouble(); + + for (int cellIndex = 1; cellIndex < cellsOrderedLeastToMostFrequent.size(); cellIndex++) { + short cell = cellsOrderedLeastToMostFrequent.get(cellIndex); + int numInstancesOfCell = cellCountsForServer.get(cell); + if (numInstancesOfCell > lowestSoFar) { + break; + } + + double candidateRandom = ThreadLocalRandom.current().nextDouble(); + if (candidateRandom > reservoirRandom) { + reservoirRandom = candidateRandom; + result = Optional.of(cell); + } + } + + return result.get(); + } + + private List getCellsOrderedLeastToMostFrequent(BalancerClusterState cluster, + int[] cellCounts, Map cellCountsForServer) { + return cellCountsForServer.keySet().stream().sorted(Comparator.comparing(cell -> { + int regionsForCell = cellCounts[cell]; + int expectedCountOnAllServers = + Ints.checkedCast((long) Math.floor((double) regionsForCell / cluster.numServers)); + + return cellCountsForServer.get(cell) - expectedCountOnAllServers; + })).collect(Collectors.toList()); + } + + private MoveRegionAction moveCell(String originStep, int fromServer, short fromCell, int toServer, + List> cellGroupSizesPerServer, BalancerClusterState cluster) { + if (LOG.isDebugEnabled() || IS_DEBUG) { + Map fromCounts = cellGroupSizesPerServer.get(fromServer); + Map toCounts = cellGroupSizesPerServer.get(toServer); + + String fromCountsString = + fromCounts.values().stream().mapToInt(x -> x).sum() + "." + fromCounts.entrySet().stream() + .sorted(Map.Entry.comparingByKey()).map( + entry -> (entry.getKey() == fromCell ? "<<" : "") + entry.getKey() + "=" + + entry.getValue() + (entry.getKey() == fromCell ? ">>" : "")) + .collect(Collectors.joining(", ", "{", "}")); + String toCountsString = + toCounts.values().stream().mapToInt(x -> x).sum() + "." + toCounts.entrySet().stream() + .sorted(Map.Entry.comparingByKey()).map( + entry -> (entry.getKey() == fromCell ? ">>" : "") + entry.getKey() + "=" + + entry.getValue() + (entry.getKey() == fromCell ? "<<" : "")) + .collect(Collectors.joining(", ", "{", "}")); + + String debugString = + String.format("[%20s]\t\tmove %d:%d -> %d\n\t %s\n\t-> %s\n", originStep, fromServer, + fromCell, toServer, fromCountsString, toCountsString); + System.out.print(debugString); + LOG.debug("{}", debugString); + } + + return (MoveRegionAction) getAction(fromServer, + resolveCellToRegion(cluster, fromServer, fromCell), toServer, NO_REGION); + } + + private SwapRegionsAction swapCells(String originStep, int fromServer, short fromCell, + int toServer, short toCell, List> cellGroupSizesPerServer, + BalancerClusterState cluster) { + if (LOG.isDebugEnabled() || IS_DEBUG) { + Map fromCounts = cellGroupSizesPerServer.get(fromServer); + Map toCounts = cellGroupSizesPerServer.get(toServer); + + String fromCountsString = + fromCounts.values().stream().mapToInt(x -> x).sum() + "." + fromCounts.entrySet().stream() + .sorted(Map.Entry.comparingByKey()).map( + entry -> (entry.getKey() == fromCell ? "<<" : "") + (entry.getKey() == toCell ? + ">>" : + "") + entry.getKey() + "=" + entry.getValue() + (entry.getKey() == fromCell ? + ">>" : + "") + (entry.getKey() == toCell ? "<<" : "")) + .collect(Collectors.joining(", ", "{", "}")); + String toCountsString = + toCounts.values().stream().mapToInt(x -> x).sum() + "." + toCounts.entrySet().stream() + .sorted(Map.Entry.comparingByKey()).map( + entry -> (entry.getKey() == toCell ? "<<" : "") + (entry.getKey() == fromCell ? + ">>" : + "") + entry.getKey() + "=" + entry.getValue() + (entry.getKey() == toCell ? ">>" : "") + + (entry.getKey() == fromCell ? "<<" : "")) + .collect(Collectors.joining(", ", "{", "}")); + + String debugString = + String.format("[%20s]\t\tswap %3d:%3d <-> %3d:%3d \n\t %s\n\t<-> %s\n", originStep, + fromServer, fromCell, toServer, toCell, fromCountsString, toCountsString); + System.out.print(debugString); + LOG.debug("{}", debugString); + } + + return (SwapRegionsAction) getAction(fromServer, + resolveCellToRegion(cluster, fromServer, fromCell), toServer, + resolveCellToRegion(cluster, toServer, toCell)); + } + + private int resolveCellToRegion(BalancerClusterState cluster, int server, short cell) { + Multimap cellsByRegion = + computeCellsByRegion(cluster.regionsPerServer[server], cluster.regions); + return pickRegionForCell(cellsByRegion, cell); + } + + private int pickRegionForCell(Multimap cellsByRegionOnServer, short cellToMove) { + return cellsByRegionOnServer.keySet().stream() + .filter(region -> cellsByRegionOnServer.get(region).contains(cellToMove)) + .min(Comparator.comparingInt(region -> cellsByRegionOnServer.get(region).size())) + .orElseGet(() -> NO_REGION); + } + + protected static Map computeCellGroupSizes(BalancerClusterState cluster, + int[] regionsForServer) { + Map cellGroupSizes = new HashMap<>(); + int[] cellCounts = new int[HubSpotCellUtilities.MAX_CELL_COUNT]; + + for (int regionIndex : regionsForServer) { + if (regionIndex < 0 || regionIndex > cluster.regions.length) { + if (LOG.isTraceEnabled()) { + LOG.trace("Skipping region {} because it's <0 or >{}", regionIndex, + regionsForServer.length); + } + continue; + } + + RegionInfo region = cluster.regions[regionIndex]; + + if (!region.getTable().getNamespaceAsString().equals("default")) { + if (LOG.isTraceEnabled()) { + LOG.trace("Skipping region {} because it's not in the default namespace", + region.getTable().getNameWithNamespaceInclAsString()); + } + continue; + } + + HubSpotCellUtilities.range(region.getStartKey(), region.getEndKey(), + HubSpotCellUtilities.MAX_CELL_COUNT).forEach(cell -> cellCounts[cell]++); + } + + for (short c = 0; c < cellCounts.length; c++) { + if (cellCounts[c] > 0) { + cellGroupSizes.put(c, cellCounts[c]); + } + } + + return cellGroupSizes; + } + + private Multimap computeCellsByRegion(int[] regionIndices, RegionInfo[] regions) { + ImmutableMultimap.Builder resultBuilder = ImmutableMultimap.builder(); + for (int regionIndex : regionIndices) { + if (regionIndex < 0 || regionIndex > regions.length) { + continue; + } + + RegionInfo region = regions[regionIndex]; + + if (!region.getTable().getNamespaceAsString().equals("default")) { + continue; + } + + HubSpotCellUtilities.range(region.getStartKey(), region.getEndKey(), + HubSpotCellUtilities.MAX_CELL_COUNT).forEach(cell -> resultBuilder.put(regionIndex, cell)); + } + return resultBuilder.build(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/PrefixCostFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/PrefixCostFunction.java new file mode 100644 index 000000000000..ba3ffb38b9f2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/PrefixCostFunction.java @@ -0,0 +1,117 @@ +package org.apache.hadoop.hbase.master.balancer; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.hubspot.HubSpotCellUtilities; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private abstract class PrefixCostFunction extends CostFunction { + + private static final Logger LOG = LoggerFactory.getLogger(PrefixCostFunction.class); + + public static final String PREFIX_DISPERSION = + "hbase.master.balancer.stochastic.prefixDispersion"; + + public static final float DEFAULT_PREFIX_DISPERSION = 0.50f; + + private float targetPrefixDispersion = 0.0f; + private int targetPrefixCountPerServer; + + private double[] serverCosts; + + private boolean costUpdated = false; + private double cost; + + private Set tableNames = new HashSet<>(); + + void emitClusterState() { + if (LOG.isTraceEnabled() && isNeeded() && cluster.regions != null + && cluster.regions.length > 0) { + try { + LOG.trace("{} cluster state @ target dispersion of {} ({} per server):\n{}", cluster.tables, + targetPrefixDispersion, targetPrefixCountPerServer, + HubSpotCellUtilities.OBJECT_MAPPER.toJson(cluster)); + } catch (Exception ex) { + LOG.error("Failed to write cluster state", ex); + } + } + } + + void setTargetPrefixDispersion(float dispersion) { + this.targetPrefixDispersion = dispersion; + } + + @Override void prepare(final BalancerClusterState cluster) { + super.prepare(cluster); + this.tableNames = new HashSet<>(); + Arrays.stream(cluster.regions).forEach(region -> tableNames.add(region.getTable())); + + if (!isNeeded()) { + return; + } + + float averageRegionsPerServer = (float) cluster.numRegions / cluster.numServers; + targetPrefixCountPerServer = + Math.max(1, Math.round(averageRegionsPerServer * targetPrefixDispersion)); + + serverCosts = new double[cluster.numServers]; + for (int server = 0; server < serverCosts.length; server++) { + serverCosts[server] = computeServerCost(server); + } + costUpdated = true; + + double startingCost = cost(); + if (LOG.isDebugEnabled()) { + LOG.debug("Preparing {} for {}, dispersion of {}, {} total regions, " + + "average of {} regions/server, target prefix count per server is {}. Initial cluster cost is {}", + getClass().getSimpleName(), cluster.tables, String.format("%.2f", targetPrefixDispersion), + cluster.numRegions, String.format("%.2f", averageRegionsPerServer), + targetPrefixCountPerServer, startingCost); + } + + emitClusterState(); + } + + @Override boolean isNeeded() { + return getMultiplier() > 0.0 && tableNames.stream().noneMatch(TableName::isSystemTable); + } + + private double computeServerCost(int server) { + if (cluster.regionsPerServer[server].length == 0) { + LOG.warn("[{}] Server {} - {} has no known regions, ", tableNames, server, cluster.servers[server].getServerName()); + return targetPrefixDispersion; + } + + int distinctPrefixes = (int) Arrays.stream(cluster.regionsPerServer[server]) + .mapToObj(regionIdx -> cluster.regions[regionIdx]).flatMap( + region -> HubSpotCellUtilities.toCells(region.getStartKey(), region.getEndKey(), + HubSpotCellUtilities.MAX_CELL_COUNT).stream()).distinct().count(); + double serverDispersion = (double) distinctPrefixes / cluster.regionsPerServer[server].length; + + return computeServerCost(serverDispersion, targetPrefixDispersion); + } + + abstract double computeServerCost(double serverDispersion, double targetDispersion); + + @Override protected void regionMoved(int region, int oldServer, int newServer) { + // recompute the stat for the given two region servers + serverCosts[oldServer] = computeServerCost(oldServer); + serverCosts[newServer] = computeServerCost(newServer); + costUpdated = true; + } + + @Override protected final double cost() { + if (!costUpdated) { + return cost; + } + + cost = Arrays.stream(serverCosts).sum() / cluster.numServers; + costUpdated = false; + + return cost; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/PrefixIsolationCandidateGenerator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/PrefixIsolationCandidateGenerator.java new file mode 100644 index 000000000000..019116aaeb77 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/PrefixIsolationCandidateGenerator.java @@ -0,0 +1,103 @@ +package org.apache.hadoop.hbase.master.balancer; + +import org.apache.hadoop.hbase.hubspot.HubSpotCellUtilities; +import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +@InterfaceAudience.Private class PrefixIsolationCandidateGenerator + extends PrefixCandidateGenerator { + + private static final Logger LOG = + LoggerFactory.getLogger(PrefixIsolationCandidateGenerator.class); + + + @Override BalanceAction generate(BalancerClusterState cluster) { + if (LOG.isTraceEnabled()) { + LOG.trace( + "Running PrefixIsolationCandidateGenerator with {} servers and {} regions for tables {}", + cluster.regionsPerServer.length, cluster.regions.length, cluster.tables); + } + + int[] cellCounts = new int[HubSpotCellUtilities.MAX_CELL_COUNT]; + Arrays.stream(cluster.regions) + .flatMap(region -> HubSpotCellUtilities.toCells(region.getStartKey(), region.getEndKey(), HubSpotCellUtilities.MAX_CELL_COUNT).stream()) + .forEach(cellOnRegion -> cellCounts[cellOnRegion]++); + double[] cellPercents = new double[HubSpotCellUtilities.MAX_CELL_COUNT]; + for (int i = 0; i < cellCounts.length; i++) { + cellPercents[i] = (double) cellCounts[i] / cluster.numRegions; + } + + List> cellGroupSizesPerServer = + Arrays.stream(cluster.regionsPerServer).map(regionsForServer -> computeCellGroupSizes(cluster, regionsForServer)).collect( + Collectors.toList()); + + return generateAction(cluster, cellCounts, cellGroupSizesPerServer); + } + + private BalanceAction generateAction( + BalancerClusterState cluster, + int[] cellCounts, + List> cellGroupSizesPerServer + ) { + int targetRegionsPerServer = Ints.checkedCast( + (long) Math.floor((double) cluster.numRegions / cluster.numServers)); + + BalanceAction moveRegionToUnderloadedServer = tryMoveRegionToSomeUnderloadedServer( + cluster, + cellCounts, + cellGroupSizesPerServer, + targetRegionsPerServer + ); + + if (moveRegionToUnderloadedServer != BalanceAction.NULL_ACTION) { + return moveRegionToUnderloadedServer; + } + + BalanceAction moveRegionFromOverloadedServer = tryMoveRegionFromSomeOverloadedServer( + cluster, + cellCounts, + cellGroupSizesPerServer, + targetRegionsPerServer + ); + + if (moveRegionFromOverloadedServer != BalanceAction.NULL_ACTION) { + return moveRegionFromOverloadedServer; + } + + int numTimesCellRegionsFillAllServers = 0; + for (int cell = 0; cell < HubSpotCellUtilities.MAX_CELL_COUNT; cell++) { + int numRegionsForCell = cellCounts[cell]; + numTimesCellRegionsFillAllServers += Ints.checkedCast((long) Math.floor((double) numRegionsForCell / cluster.numServers)); + } + + int targetCellsPerServer = targetRegionsPerServer - numTimesCellRegionsFillAllServers; + targetCellsPerServer = Math.min(targetCellsPerServer, HubSpotCellUtilities.getMaxCellsPerRs(cluster.numServers)); + Set serversBelowTarget = new HashSet<>(); + Set serversAboveTarget = new HashSet<>(); + + for (int server = 0; server < cluster.numServers; server++) { + int numCellsOnServer = cellGroupSizesPerServer.get(server).keySet().size(); + if (numCellsOnServer < targetCellsPerServer) { + serversBelowTarget.add(server); + } else if (numCellsOnServer > targetCellsPerServer) { + serversAboveTarget.add(server); + } + } + + if (serversBelowTarget.isEmpty() && serversAboveTarget.isEmpty()) { + return BalanceAction.NULL_ACTION; + } else if (!serversAboveTarget.isEmpty()) { + return swapRegionsToDecreaseDistinctCellsPerServer(cluster, cellCounts, cellGroupSizesPerServer, targetCellsPerServer); + } else { + return swapRegionsToIncreaseDistinctCellsPerServer(cluster, cellCounts, cellGroupSizesPerServer, targetCellsPerServer); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/PrefixIsolationCostFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/PrefixIsolationCostFunction.java new file mode 100644 index 000000000000..e0b2efe6e22f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/PrefixIsolationCostFunction.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private public class PrefixIsolationCostFunction extends PrefixCostFunction { + + private static final String PREFIX_ISOLATION_COST = + "hbase.master.balancer.stochastic.prefixIsolationCost"; + + private static final float DEFAULT_PREFIX_ISOLATION_COST = 0; + + PrefixIsolationCostFunction(Configuration conf) { + this.setMultiplier(conf.getFloat(PREFIX_ISOLATION_COST, DEFAULT_PREFIX_ISOLATION_COST)); + this.setTargetPrefixDispersion(conf.getFloat(PREFIX_DISPERSION, + DEFAULT_PREFIX_DISPERSION)); + } + + @Override double computeServerCost(double serverDispersion, double targetDispersion) { + return Math.max(0, serverDispersion - targetDispersion); + } + + @Override public final void updateWeight(double[] weights) { + weights[StochasticLoadBalancer.GeneratorType.RANDOM.ordinal()] += cost(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/PrefixPerformanceCostFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/PrefixPerformanceCostFunction.java new file mode 100644 index 000000000000..438882518d9e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/PrefixPerformanceCostFunction.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private public class PrefixPerformanceCostFunction extends PrefixCostFunction { + + private static final String PREFIX_PERFORMANCE_COST = + "hbase.master.balancer.stochastic.prefixPerformanceCost"; + + private static final float DEFAULT_PREFIX_PERFORMANCE_COST = 0; + + PrefixPerformanceCostFunction(Configuration conf) { + this.setMultiplier(conf.getFloat(PREFIX_PERFORMANCE_COST, DEFAULT_PREFIX_PERFORMANCE_COST)); + this.setTargetPrefixDispersion(conf.getFloat(PREFIX_DISPERSION, + DEFAULT_PREFIX_DISPERSION)); + } + + @Override double computeServerCost(double serverDispersion, double targetDispersion) { + return Math.max(0, targetDispersion - serverDispersion); + } + + @Override public final void updateWeight(double[] weights) { + weights[StochasticLoadBalancer.GeneratorType.RANDOM.ordinal()] += cost(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java index ab873380268d..85b15599e580 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java @@ -245,11 +245,15 @@ protected List getTopBlockLocations(RegionInfo region, String curren * @return ordered list of hosts holding blocks of the specified region */ protected HDFSBlocksDistribution internalGetTopBlockLocation(RegionInfo region) { + String regionNameAsString = region.getRegionNameAsString(); + LOG.debug("Fetching top block locations for {}", regionNameAsString); try { TableDescriptor tableDescriptor = getTableDescriptor(region.getTable()); if (tableDescriptor != null) { + LOG.debug("Region {} is located on {}", regionNameAsString, tableDescriptor.getTableName().getNameAsString()); HDFSBlocksDistribution blocksDistribution = HRegion.computeHDFSBlocksDistribution(getConf(), tableDescriptor, region); + LOG.debug("Top hosts for region {}: {}", regionNameAsString, blocksDistribution.getTopHosts()); return blocksDistribution; } } catch (IOException ioe) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index 607c5162ba47..41e1e4a67a68 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -167,6 +167,10 @@ public enum GeneratorType { private RegionReplicaHostCostFunction regionReplicaHostCostFunction; private RegionReplicaRackCostFunction regionReplicaRackCostFunction; + // HubSpot addition + private PrefixIsolationCostFunction prefixIsolationCostFunction; + private PrefixPerformanceCostFunction prefixPerformanceCostFunction; + /** * Use to add balancer decision history to ring-buffer */ @@ -224,12 +228,12 @@ List getCandidateGenerators() { } protected List createCandidateGenerators() { + // HubSpot addition List candidateGenerators = new ArrayList(4); candidateGenerators.add(GeneratorType.RANDOM.ordinal(), new RandomCandidateGenerator()); candidateGenerators.add(GeneratorType.LOAD.ordinal(), new LoadCandidateGenerator()); candidateGenerators.add(GeneratorType.LOCALITY.ordinal(), localityCandidateGenerator); - candidateGenerators.add(GeneratorType.RACK.ordinal(), - new RegionReplicaRackCandidateGenerator()); + candidateGenerators.add(GeneratorType.RACK.ordinal(), new RegionReplicaRackCandidateGenerator()); return candidateGenerators; } @@ -247,6 +251,9 @@ protected void loadConf(Configuration conf) { localityCost = new ServerLocalityCostFunction(conf); rackLocalityCost = new RackLocalityCostFunction(conf); + // HubSpot addition: + prefixPerformanceCostFunction = new PrefixPerformanceCostFunction(conf); + prefixIsolationCostFunction = new PrefixIsolationCostFunction(conf); this.candidateGenerators = createCandidateGenerators(); regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf); @@ -264,6 +271,11 @@ protected void loadConf(Configuration conf) { addCostFunction(new WriteRequestCostFunction(conf)); addCostFunction(new MemStoreSizeCostFunction(conf)); addCostFunction(new StoreFileCostFunction(conf)); + + // HubSpot addition: + addCostFunction(prefixIsolationCostFunction); + addCostFunction(prefixPerformanceCostFunction); + loadCustomCostFunctions(conf); curFunctionCosts = new double[costFunctions.size()]; @@ -308,9 +320,12 @@ public synchronized void updateClusterMetrics(ClusterMetrics st) { private void updateBalancerTableLoadInfo(TableName tableName, Map> loadOfOneTable) { RegionLocationFinder finder = null; + // HubSpot addition: if ( (this.localityCost != null && this.localityCost.getMultiplier() > 0) || (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0) + || (this.prefixIsolationCostFunction != null && this.prefixIsolationCostFunction.getMultiplier() > 0) + || (this.prefixPerformanceCostFunction != null && this.prefixPerformanceCostFunction.getMultiplier() > 0) ) { finder = this.regionFinder; } @@ -426,7 +441,11 @@ boolean needsBalance(TableName tableName, BalancerClusterState cluster) { @RestrictedApi(explanation = "Should only be called in tests", link = "", allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") BalanceAction nextAction(BalancerClusterState cluster) { - return getRandomGenerator().generate(cluster); + CandidateGenerator generator = getRandomGenerator(); + if (LOG.isTraceEnabled()) { + LOG.trace("Using generator {}", generator.getClass().getSimpleName()); + } + return generator.generate(cluster); } /** @@ -452,6 +471,7 @@ protected CandidateGenerator getRandomGenerator() { return candidateGenerators.get(i); } } + return candidateGenerators.get(candidateGenerators.size() - 1); } @@ -490,11 +510,16 @@ protected List balanceTable(TableName tableName, // Allow turning this feature off if the locality cost is not going to // be used in any computations. RegionLocationFinder finder = null; + // HubSpot addition: if ( (this.localityCost != null && this.localityCost.getMultiplier() > 0) || (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0) + || (this.prefixIsolationCostFunction != null && this.prefixIsolationCostFunction.getMultiplier() > 0) + || (this.prefixPerformanceCostFunction != null && this.prefixPerformanceCostFunction.getMultiplier() > 0) ) { finder = this.regionFinder; + } else { + LOG.debug("Didn't detect a need for region finder, disabling"); } // The clusterState that is given to this method contains the state @@ -537,17 +562,17 @@ protected List balanceTable(TableName tableName, computedMaxSteps = Math.min(this.maxSteps, calculatedMaxSteps); if (calculatedMaxSteps > maxSteps) { LOG.warn( - "calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than " + "[{}] calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than " + "maxSteps:{}. Hence load balancing may not work well. Setting parameter " + "\"hbase.master.balancer.stochastic.runMaxSteps\" to true can overcome this issue." + "(This config change does not require service restart)", - calculatedMaxSteps, maxSteps); + tableName.getNameWithNamespaceInclAsString(), calculatedMaxSteps, maxSteps); } } LOG.info( - "Start StochasticLoadBalancer.balancer, initial weighted average imbalance={}, " + "[{}] Start StochasticLoadBalancer.balancer, initial weighted average imbalance={}, " + "functionCost={} computedMaxSteps={}", - currentCost / sumMultiplier, functionCost(), computedMaxSteps); + tableName.getNameWithNamespaceInclAsString(), currentCost / sumMultiplier, functionCost(), computedMaxSteps); final String initFunctionTotalCosts = totalCostsPerFunc(); // Perform a stochastic walk to see if we can get a good fit. @@ -567,12 +592,22 @@ protected List balanceTable(TableName tableName, // Should this be kept? if (newCost < currentCost) { + if(LOG.isTraceEnabled()) { + LOG.trace(" S[{}]: {} -> {} via {} -- {}", + step, currentCost, newCost, action, totalCostsPerFunc()); + } + currentCost = newCost; // save for JMX curOverallCost = currentCost; System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length); } else { + if(LOG.isTraceEnabled()) { + LOG.trace(" S[{}]: {} -> {} via {} -- {}", + step, currentCost, newCost, action, totalCostsPerFunc()); + } + // Put things back the way they were before. // TODO: undo by remembering old values BalanceAction undoAction = action.undoAction(); @@ -592,19 +627,19 @@ protected List balanceTable(TableName tableName, updateStochasticCosts(tableName, curOverallCost, curFunctionCosts); plans = createRegionPlans(cluster); LOG.info( - "Finished computing new moving plan. Computation took {} ms" + "[{}] Finished computing new moving plan. Computation took {} ms" + " to try {} different iterations. Found a solution that moves " + "{} regions; Going from a computed imbalance of {}" + " to a new imbalance of {}. funtionCost={}", - endTime - startTime, step, plans.size(), initCost / sumMultiplier, + tableName.getNameWithNamespaceInclAsString(), endTime - startTime, step, plans.size(), initCost / sumMultiplier, currentCost / sumMultiplier, functionCost()); sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step); return plans; } LOG.info( - "Could not find a better moving plan. Tried {} different configurations in " + "[{}] Could not find a better moving plan. Tried {} different configurations in " + "{} ms, and did not find anything with an imbalance score less than {}", - step, endTime - startTime, initCost / sumMultiplier); + tableName.getNameWithNamespaceInclAsString(), step, endTime - startTime, initCost / sumMultiplier); return null; } @@ -776,7 +811,9 @@ void initCosts(BalancerClusterState cluster) { weightsOfGenerators = new double[this.candidateGenerators.size()]; for (CostFunction c : costFunctions) { c.prepare(cluster); - c.updateWeight(weightsOfGenerators); + if (c.isNeeded()) { + c.updateWeight(weightsOfGenerators); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerFactory.java index f97622b40631..2e18740d3d30 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerFactory.java @@ -59,8 +59,8 @@ public static RegionNormalizerManager createNormalizerManager(final Configuratio private static RegionNormalizer getRegionNormalizer(Configuration conf) { // Create instance of Region Normalizer Class balancerKlass = - conf.getClass(HConstants.HBASE_MASTER_NORMALIZER_CLASS, SimpleRegionNormalizer.class, - RegionNormalizer.class); + conf.getClass(HConstants.HBASE_MASTER_NORMALIZER_CLASS, SimpleRegionNormalizer.class, + RegionNormalizer.class); return ReflectionUtils.newInstance(balancerKlass, conf); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java index a0c296de88f4..d1c1685388ae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java @@ -25,9 +25,11 @@ import java.time.Period; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.function.BooleanSupplier; import java.util.function.Function; import org.apache.hadoop.conf.Configuration; @@ -41,10 +43,12 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.conf.ConfigurationObserver; +import org.apache.hadoop.hbase.hubspot.HubSpotCellUtilities; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hbase.thirdparty.com.google.common.collect.Sets; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -352,6 +356,8 @@ private List computeMergeNormalizationPlans(final NormalizeCo return Collections.emptyList(); } + // HubSpot addition: is table cellularized + final boolean isCellAwareTable = HubSpotCellUtilities.CELL_AWARE_TABLES.contains(ctx.tableName.getNameAsString()); final long avgRegionSizeMb = (long) ctx.getAverageRegionSizeMb(); if (avgRegionSizeMb < configuration.getMergeMinRegionSizeMb(ctx)) { return Collections.emptyList(); @@ -373,6 +379,8 @@ private List computeMergeNormalizationPlans(final NormalizeCo // walk the region chain looking for contiguous sequences of regions that can be merged. rangeMembers.clear(); sumRangeMembersSizeMb = 0; + // HubSpot addition + Set cellsInRange = new HashSet<>(); for (current = rangeStart; current < ctx.getTableRegions().size(); current++) { final RegionInfo regionInfo = ctx.getTableRegions().get(current); final long regionSizeMb = getRegionSizeMB(regionInfo); @@ -395,6 +403,17 @@ private List computeMergeNormalizationPlans(final NormalizeCo // to the range when // there's capacity // remaining. + // HubSpot addition: for cell aware tables, don't merge across cell lines + if (isCellAwareTable) { + Set regionCells = + HubSpotCellUtilities.range(regionInfo.getStartKey(), regionInfo.getEndKey()); + if (cellsInRange.isEmpty()) { + cellsInRange.addAll(regionCells); + } else if (!Sets.difference(regionCells, cellsInRange).isEmpty()) { + // region contains cells not contained in current range, not mergable - back to outer loop + break; + } + } rangeMembers.add(new NormalizationTarget(regionInfo, regionSizeMb)); sumRangeMembersSizeMb += regionSizeMb; continue; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestHubSpotCellCostFunction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestHubSpotCellCostFunction.java new file mode 100644 index 000000000000..569f5572bb1a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestHubSpotCellCostFunction.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.junit.Assert.assertEquals; +import java.util.function.Function; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.hubspot.HubSpotCellUtilities; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, SmallTests.class }) public class TestHubSpotCellCostFunction { + + private static final Function ALL_REGIONS_SIZE_1_MB = x -> 1; + + @ClassRule public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestHubSpotCellCostFunction.class); + + @Test public void testCellCountTypical() { + int numCells = HubSpotCellUtilities.calcNumCells( + new RegionInfo[] { buildRegionInfo((short) 0, (short) 1), + buildRegionInfo((short) 1, (short) 2), buildRegionInfo((short) 2, (short) 3) }, (short) 3); + assertEquals(3, numCells); + } + + @Test public void testCellCountMultipleInRegion() { + int numCells = HubSpotCellUtilities.calcNumCells( + new RegionInfo[] { buildRegionInfo((short) 0, (short) 1), + buildRegionInfo((short) 1, (short) 2), buildRegionInfo((short) 2, (short) 4), + buildRegionInfo((short) 4, (short) 5) }, (short) 5); + assertEquals(5, numCells); + } + + @Test public void testCellCountMultipleInLastRegion() { + int numCells = HubSpotCellUtilities.calcNumCells( + new RegionInfo[] { buildRegionInfo((short) 0, (short) 1), + buildRegionInfo((short) 1, (short) 2), buildRegionInfo((short) 2, (short) 3), + buildRegionInfo((short) 3, (short) 5) }, (short) 5); + assertEquals(5, numCells); + } + + @Test public void testCellCountMultipleInFirstRegion() { + int numCells = HubSpotCellUtilities.calcNumCells( + new RegionInfo[] { buildRegionInfo((short) 0, (short) 2), + buildRegionInfo((short) 2, (short) 3), buildRegionInfo((short) 3, (short) 4), + buildRegionInfo((short) 4, (short) 5) }, (short) 5); + assertEquals(5, numCells); + } + + @Test public void testCellCountLastKeyNull() { + int numCells = HubSpotCellUtilities.calcNumCells( + new RegionInfo[] { buildRegionInfo((short) 0, (short) 1), + buildRegionInfo((short) 1, (short) 2), buildRegionInfo((short) 2, (short) 3), + buildRegionInfo((short) 3, null) }, (short) 4); + assertEquals(4, numCells); + } + + @Test public void testCellCountFirstKeyNull() { + int numCells = HubSpotCellUtilities.calcNumCells( + new RegionInfo[] { buildRegionInfo(null, (short) 1), buildRegionInfo((short) 1, (short) 2), + buildRegionInfo((short) 2, (short) 3), buildRegionInfo((short) 3, (short) 4) }, (short) 4); + assertEquals(4, numCells); + } + + @Test public void testCellCountBothEndsNull() { + int numCells = HubSpotCellUtilities.calcNumCells( + new RegionInfo[] { buildRegionInfo(null, (short) 1), buildRegionInfo((short) 1, (short) 2), + buildRegionInfo((short) 2, (short) 3), buildRegionInfo((short) 3, null) }, (short) 4); + assertEquals(4, numCells); + } + + private RegionInfo buildRegionInfo(Short startCell, Short stopCell) { + RegionInfo result = RegionInfoBuilder.newBuilder(TableName.valueOf("table")) + .setStartKey(startCell == null ? null : Bytes.toBytes(startCell)) + .setEndKey(stopCell == null ? null : Bytes.toBytes(stopCell)).build(); + return result; + } + +}