From 8f79af05af4f9cde2c3ff6e720e769c2313790f7 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Thu, 20 Jun 2024 14:35:52 +0800 Subject: [PATCH 01/30] add AvgShedder. --- .../pulsar/broker/ServiceConfiguration.java | 34 +- .../broker/loadbalance/impl/AvgShedder.java | 316 ++++++++++++++++++ .../impl/ModularLoadManagerImpl.java | 7 +- 3 files changed, 353 insertions(+), 4 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 73bf2316b8287..086af0222f134 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2395,21 +2395,49 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece @FieldContext( dynamic = true, category = CATEGORY_LOAD_BALANCER, - doc = "In the UniformLoadShedder strategy, the minimum message that triggers unload." + doc = "The low threshold for the difference between the highest and lowest loaded brokers." + ) + private int loadBalancerAvgShedderLowThreshold = 15; + + @FieldContext( + dynamic = true, + category = CATEGORY_LOAD_BALANCER, + doc = "The high threshold for the difference between the highest and lowest loaded brokers." + ) + private int loadBalancerAvgShedderHighThreshold = 40; + + @FieldContext( + dynamic = true, + category = CATEGORY_LOAD_BALANCER, + doc = "The number of times the low threshold is triggered before the bundle is unloaded." + ) + private int loadBalancerAvgShedderHitCountLowThreshold = 8; + + @FieldContext( + dynamic = true, + category = CATEGORY_LOAD_BALANCER, + doc = "The number of times the high threshold is triggered before the bundle is unloaded." + ) + private int loadBalancerAvgShedderHitCountHighThreshold = 2; + + @FieldContext( + dynamic = true, + category = CATEGORY_LOAD_BALANCER, + doc = "In the UniformLoadShedder and AvgShedder strategy, the minimum message that triggers unload." ) private int minUnloadMessage = 1000; @FieldContext( dynamic = true, category = CATEGORY_LOAD_BALANCER, - doc = "In the UniformLoadShedder strategy, the minimum throughput that triggers unload." + doc = "In the UniformLoadShedder and AvgShedder strategy, the minimum throughput that triggers unload." ) private int minUnloadMessageThroughput = 1 * 1024 * 1024; @FieldContext( dynamic = true, category = CATEGORY_LOAD_BALANCER, - doc = "In the UniformLoadShedder strategy, the maximum unload ratio." + doc = "In the UniformLoadShedder and AvgShedder strategy, the maximum unload ratio." ) private double maxUnloadPercentage = 0.2; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java new file mode 100644 index 0000000000000..b0278ac3a2ea6 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.impl; + +import com.google.common.base.Charsets; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import com.google.common.hash.Hashing; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import org.apache.commons.lang3.mutable.MutableDouble; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.LoadData; +import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy; +import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy; +import org.apache.pulsar.common.naming.NamespaceBundle; +import org.apache.pulsar.policies.data.loadbalancer.BrokerData; +import org.apache.pulsar.policies.data.loadbalancer.BundleData; +import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; +import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AvgShedder implements LoadSheddingStrategy, ModularLoadManagerStrategy { + private static Logger log = LoggerFactory.getLogger(AvgShedder.class); + + // map bundle to broker. + Map bundleBrokerMap = new HashMap<>(); + // map broker to Scores. scores:0-100 + Map brokerScoreMap = new HashMap<>(); + // map broker hit count for high threshold/low threshold + Map brokerHitCountForHigh = new HashMap<>(); + Map brokerHitCountForLow = new HashMap<>(); + + // result returned by shedding, map broker to bundles. + private final Multimap selectedBundlesCache = ArrayListMultimap.create(); + + private static final double MB = 1024 * 1024; + private static final Random random = new Random(); + + public AvgShedder() { + } + + public AvgShedder(final ServiceConfiguration conf) { + } + + @Override + public Multimap findBundlesForUnloading(LoadData loadData, ServiceConfiguration conf) { + selectedBundlesCache.clear(); + final double minThroughputThreshold = conf.getMinUnloadMessageThroughput(); + final double minMsgThreshold = conf.getMinUnloadMessage(); + final double maxUnloadPercentage = conf.getMaxUnloadPercentage(); + + final Map recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles(); + final double lowThreshold = conf.getLoadBalancerAvgShedderLowThreshold(); + final double highThreshold = conf.getLoadBalancerAvgShedderHighThreshold(); + final int hitCountHighThreshold = conf.getLoadBalancerAvgShedderHitCountHighThreshold(); + final int hitCountLowThreshold = conf.getLoadBalancerAvgShedderHitCountLowThreshold(); + if (log.isDebugEnabled()) { + log.debug("highThreshold:{}, lowThreshold:{}, hitCountHighThreshold:{}, hitCountLowThreshold:{}, " + + "minMsgThreshold:{}, minThroughputThreshold:{}", + highThreshold, lowThreshold, hitCountHighThreshold, hitCountLowThreshold, + minMsgThreshold, minThroughputThreshold); + } + + List brokers = new LinkedList<>(); + brokerScoreMap.clear(); + // calculate scores of brokers. + for (Map.Entry entry : loadData.getBrokerData().entrySet()) { + LocalBrokerData localBrokerData = entry.getValue().getLocalData(); + String broker = entry.getKey(); + brokers.add(broker); + Double score = calculateScores(broker, localBrokerData, conf); + brokerScoreMap.put(broker, score); + // collect data to analyze the relations between scores and throughput and messageRate. + log.info("broker:{}, scores:{}, throughput:{}, messageRate:{}", broker, score, + localBrokerData.getMsgThroughputIn() + localBrokerData.getMsgThroughputOut(), + localBrokerData.getMsgRateIn() + localBrokerData.getMsgRateOut()); + } + + // sort brokers by scores. + Collections.sort(brokers, (e1, e2) -> (int) (brokerScoreMap.get(e1) - brokerScoreMap.get(e2))); + if (log.isDebugEnabled()) { + log.debug("sorted broker list:{}", brokers); + } + + // find broker pairs for shedding. + List> pairs = new LinkedList<>(); + int i = 0, j = brokers.size() - 1; + while (i <= j) { + String maxBroker = brokers.get(j); + String minBroker = brokers.get(i); + if (brokerScoreMap.get(maxBroker) - brokerScoreMap.get(minBroker) < lowThreshold) { + brokerHitCountForHigh.remove(maxBroker); + brokerHitCountForHigh.remove(minBroker); + + brokerHitCountForLow.remove(maxBroker); + brokerHitCountForLow.remove(minBroker); + } else { + pairs.add(Pair.of(minBroker, maxBroker)); + if (brokerScoreMap.get(maxBroker) - brokerScoreMap.get(minBroker) < highThreshold) { + brokerHitCountForLow.put(minBroker, brokerHitCountForLow.getOrDefault(minBroker, 0) + 1); + brokerHitCountForLow.put(maxBroker, brokerHitCountForLow.getOrDefault(maxBroker, 0) + 1); + + brokerHitCountForHigh.remove(maxBroker); + brokerHitCountForHigh.remove(minBroker); + } else { + brokerHitCountForLow.put(minBroker, brokerHitCountForLow.getOrDefault(minBroker, 0) + 1); + brokerHitCountForLow.put(maxBroker, brokerHitCountForLow.getOrDefault(maxBroker, 0) + 1); + + brokerHitCountForHigh.put(minBroker, brokerHitCountForHigh.getOrDefault(minBroker, 0) + 1); + brokerHitCountForHigh.put(maxBroker, brokerHitCountForHigh.getOrDefault(maxBroker, 0) + 1); + } + } + i++; + j--; + } + + log.info("brokerHitCountForHigh:{}, brokerHitCountForLow:{}", + brokerHitCountForHigh, brokerHitCountForLow); + + if (pairs.isEmpty()) { + if (log.isDebugEnabled()) { + log.debug("there is no any overload broker.no need to shedding bundles."); + } + brokerHitCountForHigh.clear(); + brokerHitCountForLow.clear(); + return selectedBundlesCache; + } + + // choosing bundles to unload. + for (Pair pair : pairs) { + String overloadedBroker = pair.getRight(); + String underloadedBroker = pair.getLeft(); + + // check hit count for high threshold and low threshold. + if (!(brokerHitCountForHigh.getOrDefault(underloadedBroker, 0) >= hitCountHighThreshold) + && !(brokerHitCountForHigh.getOrDefault(overloadedBroker, 0) >= hitCountHighThreshold) + && !(brokerHitCountForLow.getOrDefault(underloadedBroker, 0) >= hitCountLowThreshold) + && !(brokerHitCountForLow.getOrDefault(overloadedBroker, 0) >= hitCountLowThreshold)) { + continue; + } + + // if hit, remove entry. + brokerHitCountForHigh.remove(underloadedBroker); + brokerHitCountForHigh.remove(overloadedBroker); + + brokerHitCountForLow.remove(underloadedBroker); + brokerHitCountForLow.remove(overloadedBroker); + + // calculate how much throughput to unload. + LocalBrokerData minLocalBrokerData = loadData.getBrokerData().get(underloadedBroker).getLocalData(); + LocalBrokerData maxLocalBrokerData = loadData.getBrokerData().get(overloadedBroker).getLocalData(); + + double minMsgRate = minLocalBrokerData.getMsgRateIn() + minLocalBrokerData.getMsgRateOut(); + double maxMsgRate = maxLocalBrokerData.getMsgRateIn() + maxLocalBrokerData.getMsgRateOut(); + + double minThroughput = minLocalBrokerData.getMsgThroughputIn() + minLocalBrokerData.getMsgThroughputOut(); + double maxThroughput = maxLocalBrokerData.getMsgThroughputIn() + maxLocalBrokerData.getMsgThroughputOut(); + + double msgRequiredFromUnloadedBundles = (maxMsgRate - minMsgRate) * maxUnloadPercentage; + double throuputRequiredFromUnloadedBundles = (maxThroughput - minThroughput) * maxUnloadPercentage; + + boolean isMsgRateToOffload; + MutableDouble trafficMarkedToOffload = new MutableDouble(0); + + if (msgRequiredFromUnloadedBundles > minMsgThreshold) { + isMsgRateToOffload = true; + trafficMarkedToOffload.setValue(msgRequiredFromUnloadedBundles); + } else if (throuputRequiredFromUnloadedBundles > minThroughputThreshold) { + isMsgRateToOffload = false; + trafficMarkedToOffload.setValue(throuputRequiredFromUnloadedBundles); + } else { + log.info( + "broker:[{}] is planning to shed bundles to broker:[{}],but the throughput {} MByte/s is " + + "less than minimumThroughputThreshold {} MByte/s, and the msgRate {} rate/s" + + " is also less than minimumMsgRateThreshold {} rate/s, skipping bundle unload.", + overloadedBroker, underloadedBroker, throuputRequiredFromUnloadedBundles / MB, + minThroughputThreshold / MB, msgRequiredFromUnloadedBundles, minMsgThreshold); + continue; + } + + if (maxLocalBrokerData.getBundles().size() == 1) { + log.warn("HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. " + + "No Load Shedding will be done on this broker", + maxLocalBrokerData.getBundles().iterator().next(), pair.getRight()); + } else if (maxLocalBrokerData.getBundles().size() == 0) { + log.warn("Broker {} is overloaded despite having no bundles", pair.getRight()); + } + + // do shedding + log.info( + "broker:[{}] is planning to shed bundles to broker:[{}]. " + + "maxBroker stat:scores:{}, throughput:{}, msgRate:{}. " + + "minBroker stat:scores:{}, throughput:{}, msgRate:{}. " + + "isMsgRateToOffload:{}, trafficMarkedToOffload:{}", + overloadedBroker, underloadedBroker, brokerScoreMap.get(overloadedBroker), maxThroughput, + maxMsgRate, brokerScoreMap.get(underloadedBroker), minThroughput, minMsgRate, + isMsgRateToOffload, trafficMarkedToOffload); + + loadData.getBundleDataForLoadShedding().entrySet().stream().filter(e -> + maxLocalBrokerData.getBundles().contains(e.getKey()) + ).filter(e -> + !recentlyUnloadedBundles.containsKey(e.getKey()) + ).map((e) -> { + BundleData bundleData = e.getValue(); + TimeAverageMessageData shortTermData = bundleData.getShortTermData(); + double traffic = isMsgRateToOffload + ? shortTermData.getMsgRateIn() + shortTermData.getMsgRateOut() + : shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut(); + return Pair.of(e, traffic); + }).sorted((e1, e2) -> + Double.compare(e2.getRight(), e1.getRight()) + ).forEach(e -> { + Map.Entry bundle = e.getLeft(); + double traffic = e.getRight(); + if (traffic > 0 && traffic < trafficMarkedToOffload.getValue()) { + selectedBundlesCache.put(overloadedBroker, bundle.getKey()); + bundleBrokerMap.put(bundle.getValue(), underloadedBroker); + trafficMarkedToOffload.add(-traffic); + if (log.isDebugEnabled()) { + log.debug("Found bundle to unload:{}, isMsgRateToOffload:{}, traffic:{}", + bundle, isMsgRateToOffload, traffic); + } + } + }); + } + return selectedBundlesCache; + } + + @Override + public void onActiveBrokersChange(Set activeBrokers) { + LoadSheddingStrategy.super.onActiveBrokersChange(activeBrokers); + } + + private Double calculateScores(String broker, LocalBrokerData localBrokerData, final ServiceConfiguration conf) { + double score = localBrokerData.getMaxResourceUsageWithWeight( + conf.getLoadBalancerCPUResourceWeight(), + conf.getLoadBalancerDirectMemoryResourceWeight(), + conf.getLoadBalancerBandwithInResourceWeight(), + conf.getLoadBalancerBandwithOutResourceWeight()) * 100; + return score; + } + + @Override + public Optional selectBroker(Set candidates, BundleData bundleToAssign, LoadData loadData, + ServiceConfiguration conf){ + if (!bundleBrokerMap.containsKey(bundleToAssign) || !candidates.contains(bundleBrokerMap.get(bundleToAssign))) { + // cluster initializing or broker is shutdown + if (log.isDebugEnabled()) { + if (!bundleBrokerMap.containsKey(bundleToAssign)) { + log.debug("cluster is initializing"); + } else { + log.debug("expected broker:{} is shutdown, candidates:{}", bundleBrokerMap.get(bundleToAssign), + candidates); + } + } + String broker = getExpectedBroker(candidates, bundleToAssign); + bundleBrokerMap.put(bundleToAssign, broker); + return Optional.of(broker); + } else { + return Optional.of(bundleBrokerMap.get(bundleToAssign)); + } + } + + private static long hash(String key) { + return Hashing.crc32().hashString(key, Charsets.UTF_8).padToLong(); + } + + private static String getExpectedBroker(Collection brokers, BundleData bundle) { + List sortedBrokers = new ArrayList<>(brokers); + Collections.sort(sortedBrokers); + + try { + // add random number as input of hashing function to avoid special case that, + // if there is 4 brokers running in the cluster,and add broker5,and shutdown broker3, + // then all bundles belonging to broker3 will be loaded on the same broker. + final long hashcode = hash(NamespaceBundle.getBundleRange(bundle.toString()) + random.nextInt()); + final int index = (int) (Math.abs(hashcode) % sortedBrokers.size()); + if (log.isDebugEnabled()) { + log.debug("Assignment details: brokers={}, bundle={}, hashcode={}, index={}", + sortedBrokers, bundle, hashcode, index); + } + return sortedBrokers.get(index); + } catch (Throwable e) { + // theoretically this logic branch should not be executed + log.error("Bundle format of {} is invalid", bundle, e); + return sortedBrokers.get(Math.abs(bundle.hashCode()) % sortedBrokers.size()); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 3af372607cb16..fe4950af2be62 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -270,7 +270,12 @@ public void initialize(final PulsarService pulsar) { () -> LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, brokerToFailureDomainMap)); }); - loadSheddingStrategy = createLoadSheddingStrategy(); + if (placementStrategy instanceof LoadSheddingStrategy) { + // bind the load shedding strategy and the placement strategy + loadSheddingStrategy = (LoadSheddingStrategy) placementStrategy; + } else { + loadSheddingStrategy = createLoadSheddingStrategy(); + } } public void handleDataNotification(Notification t) { From be38d4d63352095879c00e08d9201fb70a8dd984 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Thu, 20 Jun 2024 14:48:34 +0800 Subject: [PATCH 02/30] fix typo. --- .../apache/pulsar/broker/loadbalance/impl/AvgShedder.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java index b0278ac3a2ea6..bab94e9646cfd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java @@ -184,7 +184,7 @@ public Multimap findBundlesForUnloading(LoadData loadData, Servi double maxThroughput = maxLocalBrokerData.getMsgThroughputIn() + maxLocalBrokerData.getMsgThroughputOut(); double msgRequiredFromUnloadedBundles = (maxMsgRate - minMsgRate) * maxUnloadPercentage; - double throuputRequiredFromUnloadedBundles = (maxThroughput - minThroughput) * maxUnloadPercentage; + double throughputRequiredFromUnloadedBundles = (maxThroughput - minThroughput) * maxUnloadPercentage; boolean isMsgRateToOffload; MutableDouble trafficMarkedToOffload = new MutableDouble(0); @@ -192,15 +192,15 @@ public Multimap findBundlesForUnloading(LoadData loadData, Servi if (msgRequiredFromUnloadedBundles > minMsgThreshold) { isMsgRateToOffload = true; trafficMarkedToOffload.setValue(msgRequiredFromUnloadedBundles); - } else if (throuputRequiredFromUnloadedBundles > minThroughputThreshold) { + } else if (throughputRequiredFromUnloadedBundles > minThroughputThreshold) { isMsgRateToOffload = false; - trafficMarkedToOffload.setValue(throuputRequiredFromUnloadedBundles); + trafficMarkedToOffload.setValue(throughputRequiredFromUnloadedBundles); } else { log.info( "broker:[{}] is planning to shed bundles to broker:[{}],but the throughput {} MByte/s is " + "less than minimumThroughputThreshold {} MByte/s, and the msgRate {} rate/s" + " is also less than minimumMsgRateThreshold {} rate/s, skipping bundle unload.", - overloadedBroker, underloadedBroker, throuputRequiredFromUnloadedBundles / MB, + overloadedBroker, underloadedBroker, throughputRequiredFromUnloadedBundles / MB, minThroughputThreshold / MB, msgRequiredFromUnloadedBundles, minMsgThreshold); continue; } From 79441596b8f0d779926eea6e6eb3ab12ba4ea414 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Thu, 27 Jun 2024 17:06:04 +0800 Subject: [PATCH 03/30] replace bundle name + random number with random number only. --- .../org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java index bab94e9646cfd..770608a5b7585 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java @@ -297,10 +297,10 @@ private static String getExpectedBroker(Collection brokers, BundleData b Collections.sort(sortedBrokers); try { - // add random number as input of hashing function to avoid special case that, + // use random number as input of hashing function to avoid special case that, // if there is 4 brokers running in the cluster,and add broker5,and shutdown broker3, // then all bundles belonging to broker3 will be loaded on the same broker. - final long hashcode = hash(NamespaceBundle.getBundleRange(bundle.toString()) + random.nextInt()); + final long hashcode = hash(String.valueOf(random.nextInt())); final int index = (int) (Math.abs(hashcode) % sortedBrokers.size()); if (log.isDebugEnabled()) { log.debug("Assignment details: brokers={}, bundle={}, hashcode={}, index={}", From 4cf4f2b8d3794192c91461068d276dd124c10fec Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 28 Jun 2024 12:07:27 +0800 Subject: [PATCH 04/30] add test code. --- .../broker/loadbalance/impl/AvgShedder.java | 14 +++++--------- .../ModularLoadManagerStrategyTest.java | 17 +++++++++++++++++ 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java index 770608a5b7585..0059c85d027d0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java @@ -38,7 +38,6 @@ import org.apache.pulsar.broker.loadbalance.LoadData; import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy; import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy; -import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.policies.data.loadbalancer.BrokerData; import org.apache.pulsar.policies.data.loadbalancer.BundleData; import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; @@ -50,12 +49,12 @@ public class AvgShedder implements LoadSheddingStrategy, ModularLoadManagerStrat private static Logger log = LoggerFactory.getLogger(AvgShedder.class); // map bundle to broker. - Map bundleBrokerMap = new HashMap<>(); + private Map bundleBrokerMap = new HashMap<>(); // map broker to Scores. scores:0-100 - Map brokerScoreMap = new HashMap<>(); + private Map brokerScoreMap = new HashMap<>(); // map broker hit count for high threshold/low threshold - Map brokerHitCountForHigh = new HashMap<>(); - Map brokerHitCountForLow = new HashMap<>(); + private Map brokerHitCountForHigh = new HashMap<>(); + private Map brokerHitCountForLow = new HashMap<>(); // result returned by shedding, map broker to bundles. private final Multimap selectedBundlesCache = ArrayListMultimap.create(); @@ -66,9 +65,6 @@ public class AvgShedder implements LoadSheddingStrategy, ModularLoadManagerStrat public AvgShedder() { } - public AvgShedder(final ServiceConfiguration conf) { - } - @Override public Multimap findBundlesForUnloading(LoadData loadData, ServiceConfiguration conf) { selectedBundlesCache.clear(); @@ -269,7 +265,7 @@ private Double calculateScores(String broker, LocalBrokerData localBrokerData, f @Override public Optional selectBroker(Set candidates, BundleData bundleToAssign, LoadData loadData, - ServiceConfiguration conf){ + ServiceConfiguration conf) { if (!bundleBrokerMap.containsKey(bundleToAssign) || !candidates.contains(bundleBrokerMap.get(bundleToAssign))) { // cluster initializing or broker is shutdown if (log.isDebugEnabled()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java index f5bd0f46a5ec1..818af35943f36 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.impl.AvgShedder; import org.apache.pulsar.broker.loadbalance.impl.LeastLongTermMessageRate; import org.apache.pulsar.broker.loadbalance.impl.LeastResourceUsageWithWeight; import org.apache.pulsar.broker.loadbalance.impl.RoundRobinBrokerSelector; @@ -47,6 +48,22 @@ @Test(groups = "broker") public class ModularLoadManagerStrategyTest { + public void testAvgShedder() throws Exception{ + ModularLoadManagerStrategy strategy = new AvgShedder(); + Field field = AvgShedder.class.getDeclaredField("bundleBrokerMap"); + field.setAccessible(true); + Map bundleBrokerMap = (Map) field.get(strategy); + BundleData bundleData = new BundleData(); + // assign bundle broker1 in bundleBrokerMap. + bundleBrokerMap.put(bundleData, "1"); + assertEquals(strategy.selectBroker(Set.of("1", "2", "3"), bundleData, null, null), Optional.of("1")); + assertEquals(bundleBrokerMap.get(bundleData), "1"); + + // remove broker1 in candidates, only broker2 is candidate. + assertEquals(strategy.selectBroker(Set.of("2"), bundleData, null, null), Optional.of("2")); + assertEquals(bundleBrokerMap.get(bundleData), "2"); + } + // Test that least long term message rate works correctly. public void testLeastLongTermMessageRate() { BundleData bundleData = new BundleData(); From 99caa976df8c786447b8ef9c03969da1ada33c09 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 28 Jun 2024 15:02:19 +0800 Subject: [PATCH 05/30] add test code. --- .../broker/loadbalance/impl/AvgShedder.java | 2 +- .../loadbalance/impl/AvgShedderTest.java | 184 ++++++++++++++++++ 2 files changed, 185 insertions(+), 1 deletion(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedderTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java index 0059c85d027d0..436b5c69857ed 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java @@ -235,7 +235,7 @@ public Multimap findBundlesForUnloading(LoadData loadData, Servi ).forEach(e -> { Map.Entry bundle = e.getLeft(); double traffic = e.getRight(); - if (traffic > 0 && traffic < trafficMarkedToOffload.getValue()) { + if (traffic > 0 && traffic <= trafficMarkedToOffload.getValue()) { selectedBundlesCache.put(overloadedBroker, bundle.getKey()); bundleBrokerMap.put(bundle.getValue(), underloadedBroker); trafficMarkedToOffload.add(-traffic); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedderTest.java new file mode 100644 index 0000000000000..fa9c01173b078 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedderTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.impl; + +import com.google.common.collect.Multimap; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.LoadData; +import org.apache.pulsar.policies.data.loadbalancer.BrokerData; +import org.apache.pulsar.policies.data.loadbalancer.BundleData; +import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; +import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; +import org.apache.pulsar.policies.data.loadbalancer.TimeAverageBrokerData; +import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + +@Test(groups = "broker") +public class AvgShedderTest { + private AvgShedder avgShedder; + private final ServiceConfiguration conf; + + public AvgShedderTest() { + conf = new ServiceConfiguration(); + } + + @BeforeMethod + public void setup() { + avgShedder = new AvgShedder(); + } + + private BrokerData initBrokerData() { + LocalBrokerData localBrokerData = new LocalBrokerData(); + localBrokerData.setCpu(new ResourceUsage()); + localBrokerData.setMemory(new ResourceUsage()); + localBrokerData.setBandwidthIn(new ResourceUsage()); + localBrokerData.setBandwidthOut(new ResourceUsage()); + BrokerData brokerData = new BrokerData(localBrokerData); + TimeAverageBrokerData timeAverageBrokerData = new TimeAverageBrokerData(); + brokerData.setTimeAverageData(timeAverageBrokerData); + return brokerData; + } + + @Test + public void testHitHighThreshold() { + LoadData loadData = new LoadData(); + BrokerData brokerData1 = initBrokerData(); + BrokerData brokerData2 = initBrokerData(); + BrokerData brokerData3 = initBrokerData(); + loadData.getBrokerData().put("broker1", brokerData1); + loadData.getBrokerData().put("broker2", brokerData2); + loadData.getBrokerData().put("broker3", brokerData3); + // AvgShedder will distribute the load evenly between the highest and lowest brokers + conf.setMaxUnloadPercentage(0.5); + + // Set the high threshold to 40% and hit count high threshold to 2 + int hitCountForHighThreshold = 2; + conf.setLoadBalancerAvgShedderHighThreshold(40); + conf.setLoadBalancerAvgShedderHitCountHighThreshold(hitCountForHighThreshold); + brokerData1.getLocalData().setCpu(new ResourceUsage(80, 100)); + brokerData2.getLocalData().setCpu(new ResourceUsage(30, 100)); + brokerData1.getLocalData().setMsgRateIn(10000); + brokerData1.getLocalData().setMsgRateOut(10000); + brokerData2.getLocalData().setMsgRateIn(1000); + brokerData2.getLocalData().setMsgRateOut(1000); + + // broker3 is in the middle + brokerData3.getLocalData().setCpu(new ResourceUsage(50, 100)); + brokerData3.getLocalData().setMsgRateIn(5000); + brokerData3.getLocalData().setMsgRateOut(5000); + + // expect to shed bundles with message rate(in+out) ((10000+10000)-(1000+1000))/2 = 9000 + // each bundle with 450 msg rate in and 450 msg rate out + // so 9000/(450+450)=10 bundles will be shed + for (int i = 0; i < 11; i++) { + brokerData1.getLocalData().getBundles().add("bundle-" + i); + BundleData bundle = new BundleData(); + TimeAverageMessageData timeAverageMessageData = new TimeAverageMessageData(); + timeAverageMessageData.setMsgRateIn(450); + timeAverageMessageData.setMsgRateOut(450); + bundle.setShortTermData(timeAverageMessageData); + loadData.getBundleData().put("bundle-" + i, bundle); + } + + // do shedding for the first time, expect to shed nothing because hit count is not enough + Multimap bundlesToUnload = avgShedder.findBundlesForUnloading(loadData, conf); + assertEquals(bundlesToUnload.size(), 0); + + // do shedding for the second time, expect to shed 10 bundles + bundlesToUnload = avgShedder.findBundlesForUnloading(loadData, conf); + assertEquals(bundlesToUnload.size(), 10); + + // assert that all the bundles are shed from broker1 + for (String broker : bundlesToUnload.keys()) { + assertEquals(broker, "broker1"); + } + // assert that all the bundles are shed to broker2 + for (String bundle : bundlesToUnload.values()) { + BundleData bundleData = loadData.getBundleData().get(bundle); + assertEquals(avgShedder.selectBroker(loadData.getBrokerData().keySet(), bundleData, loadData, conf).get(), "broker2"); + } + } + + @Test + public void testHitLowThreshold() { + LoadData loadData = new LoadData(); + BrokerData brokerData1 = initBrokerData(); + BrokerData brokerData2 = initBrokerData(); + BrokerData brokerData3 = initBrokerData(); + loadData.getBrokerData().put("broker1", brokerData1); + loadData.getBrokerData().put("broker2", brokerData2); + loadData.getBrokerData().put("broker3", brokerData3); + // AvgShedder will distribute the load evenly between the highest and lowest brokers + conf.setMaxUnloadPercentage(0.5); + + // Set the high threshold to 40% and hit count high threshold to 2 + int hitCountForLowThreshold = 6; + conf.setLoadBalancerAvgShedderLowThreshold(20); + conf.setLoadBalancerAvgShedderHitCountLowThreshold(hitCountForLowThreshold); + brokerData1.getLocalData().setCpu(new ResourceUsage(60, 100)); + brokerData2.getLocalData().setCpu(new ResourceUsage(40, 100)); + brokerData1.getLocalData().setMsgRateIn(10000); + brokerData1.getLocalData().setMsgRateOut(10000); + brokerData2.getLocalData().setMsgRateIn(1000); + brokerData2.getLocalData().setMsgRateOut(1000); + + // broker3 is in the middle + brokerData3.getLocalData().setCpu(new ResourceUsage(50, 100)); + brokerData3.getLocalData().setMsgRateIn(5000); + brokerData3.getLocalData().setMsgRateOut(5000); + + // expect to shed bundles with message rate(in+out) ((10000+10000)-(1000+1000))/2 = 9000 + // each bundle with 450 msg rate in and 450 msg rate out + // so 9000/(450+450)=10 bundles will be shed + for (int i = 0; i < 11; i++) { + brokerData1.getLocalData().getBundles().add("bundle-" + i); + BundleData bundle = new BundleData(); + TimeAverageMessageData timeAverageMessageData = new TimeAverageMessageData(); + timeAverageMessageData.setMsgRateIn(450); + timeAverageMessageData.setMsgRateOut(450); + bundle.setShortTermData(timeAverageMessageData); + loadData.getBundleData().put("bundle-" + i, bundle); + } + + // do shedding for (lowCountForHighThreshold - 1) times, expect to shed nothing because hit count is not enough + Multimap bundlesToUnload; + for (int i = 0; i < hitCountForLowThreshold - 1; i++) { + bundlesToUnload = avgShedder.findBundlesForUnloading(loadData, conf); + assertEquals(bundlesToUnload.size(), 0); + } + + // do shedding for the last time, expect to shed 10 bundles + bundlesToUnload = avgShedder.findBundlesForUnloading(loadData, conf); + assertEquals(bundlesToUnload.size(), 10); + + // assert that all the bundles are shed from broker1 + for (String broker : bundlesToUnload.keys()) { + assertEquals(broker, "broker1"); + } + // assert that all the bundles are shed to broker2 + for (String bundle : bundlesToUnload.values()) { + BundleData bundleData = loadData.getBundleData().get(bundle); + assertEquals(avgShedder.selectBroker(loadData.getBrokerData().keySet(), bundleData, loadData, conf).get(), "broker2"); + } + } + +} From b5e444bd0519a9a26a4d9d0948f9d0720dfe26eb Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 28 Jun 2024 15:09:32 +0800 Subject: [PATCH 06/30] update conf. --- conf/broker.conf | 19 +++++++++++++++++++ .../pulsar/broker/ServiceConfiguration.java | 2 ++ 2 files changed, 21 insertions(+) diff --git a/conf/broker.conf b/conf/broker.conf index 5c5d8d42817e9..b715c4e515bc8 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1414,6 +1414,25 @@ loadBalancerBundleUnloadMinThroughputThreshold=10 # Time to wait for the unloading of a namespace bundle namespaceBundleUnloadingTimeoutMs=60000 +# configuration for AvgShedder, a new shedding and placement strategy +# The low threshold for the difference between the highest and lowest loaded brokers. +loadBalancerAvgShedderLowThreshold = 15 + +# The high threshold for the difference between the highest and lowest loaded brokers. +loadBalancerAvgShedderHighThreshold = 40 + +# The number of times the low threshold is triggered before the bundle is unloaded. +loadBalancerAvgShedderHitCountLowThreshold = 8 + +# The number of times the high threshold is triggered before the bundle is unloaded. +loadBalancerAvgShedderHitCountHighThreshold = 2 + +# In the UniformLoadShedder and AvgShedder strategy, the maximum unload ratio. +# For AvgShedder, recommend to set to 0.5, so that it will distribute the load evenly +# between the highest and lowest brokers. +maxUnloadPercentage = 0.2 + + ### --- Load balancer extension --- ### # Option to enable the debug mode for the load balancer logics. diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 086af0222f134..e6e78d14260df 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2438,6 +2438,8 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece dynamic = true, category = CATEGORY_LOAD_BALANCER, doc = "In the UniformLoadShedder and AvgShedder strategy, the maximum unload ratio." + + "For AvgShedder, recommend to set to 0.5, so that it will distribute the load evenly" + + " between the highest and lowest brokers." ) private double maxUnloadPercentage = 0.2; From 6d4678abe5a7cff19d02a95b18a3f93b1d2d7318 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 28 Jun 2024 15:13:27 +0800 Subject: [PATCH 07/30] fix typo. --- .../apache/pulsar/broker/loadbalance/impl/AvgShedderTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedderTest.java index fa9c01173b078..4a97897edfc13 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedderTest.java @@ -130,7 +130,7 @@ public void testHitLowThreshold() { // AvgShedder will distribute the load evenly between the highest and lowest brokers conf.setMaxUnloadPercentage(0.5); - // Set the high threshold to 40% and hit count high threshold to 2 + // Set the low threshold to 20% and hit count low threshold to 6 int hitCountForLowThreshold = 6; conf.setLoadBalancerAvgShedderLowThreshold(20); conf.setLoadBalancerAvgShedderHitCountLowThreshold(hitCountForLowThreshold); From 59907a80d7aeb81be1c6664e58759a7d03fe81d8 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 28 Jun 2024 16:22:36 +0800 Subject: [PATCH 08/30] add test code. --- .../broker/loadbalance/impl/AvgShedder.java | 8 +- .../loadbalance/impl/AvgShedderTest.java | 99 +++++++++++++++++++ 2 files changed, 103 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java index 436b5c69857ed..483f053afd5bf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java @@ -49,12 +49,12 @@ public class AvgShedder implements LoadSheddingStrategy, ModularLoadManagerStrat private static Logger log = LoggerFactory.getLogger(AvgShedder.class); // map bundle to broker. - private Map bundleBrokerMap = new HashMap<>(); + private final Map bundleBrokerMap = new HashMap<>(); // map broker to Scores. scores:0-100 - private Map brokerScoreMap = new HashMap<>(); + private final Map brokerScoreMap = new HashMap<>(); // map broker hit count for high threshold/low threshold - private Map brokerHitCountForHigh = new HashMap<>(); - private Map brokerHitCountForLow = new HashMap<>(); + private final Map brokerHitCountForHigh = new HashMap<>(); + private final Map brokerHitCountForLow = new HashMap<>(); // result returned by shedding, map broker to bundles. private final Multimap selectedBundlesCache = ArrayListMultimap.create(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedderTest.java index 4a97897edfc13..215e3d766a927 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedderTest.java @@ -31,6 +31,7 @@ import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; @Test(groups = "broker") public class AvgShedderTest { @@ -95,6 +96,9 @@ public void testHitHighThreshold() { TimeAverageMessageData timeAverageMessageData = new TimeAverageMessageData(); timeAverageMessageData.setMsgRateIn(450); timeAverageMessageData.setMsgRateOut(450); + // as AvgShedder map BundleData to broker, the hashCode of different BundleData should be different + // so we need to set some different fields to make the hashCode different + timeAverageMessageData.setNumSamples(i); bundle.setShortTermData(timeAverageMessageData); loadData.getBundleData().put("bundle-" + i, bundle); } @@ -155,6 +159,9 @@ public void testHitLowThreshold() { TimeAverageMessageData timeAverageMessageData = new TimeAverageMessageData(); timeAverageMessageData.setMsgRateIn(450); timeAverageMessageData.setMsgRateOut(450); + // as AvgShedder map BundleData to broker, the hashCode of different BundleData should be different + // so we need to set some different fields to make the hashCode different + timeAverageMessageData.setNumSamples(i); bundle.setShortTermData(timeAverageMessageData); loadData.getBundleData().put("bundle-" + i, bundle); } @@ -181,4 +188,96 @@ public void testHitLowThreshold() { } } + @Test + public void testSheddingMultiplePairs() { + LoadData loadData = new LoadData(); + BrokerData brokerData1 = initBrokerData(); + BrokerData brokerData2 = initBrokerData(); + BrokerData brokerData3 = initBrokerData(); + BrokerData brokerData4 = initBrokerData(); + loadData.getBrokerData().put("broker1", brokerData1); + loadData.getBrokerData().put("broker2", brokerData2); + loadData.getBrokerData().put("broker3", brokerData3); + loadData.getBrokerData().put("broker4", brokerData4); + // AvgShedder will distribute the load evenly between the highest and lowest brokers + conf.setMaxUnloadPercentage(0.5); + + // Set the high threshold to 40% and hit count high threshold to 2 + int hitCountForHighThreshold = 2; + conf.setLoadBalancerAvgShedderHighThreshold(40); + conf.setLoadBalancerAvgShedderHitCountHighThreshold(hitCountForHighThreshold); + + // pair broker1 and broker2 + brokerData1.getLocalData().setCpu(new ResourceUsage(80, 100)); + brokerData2.getLocalData().setCpu(new ResourceUsage(30, 100)); + brokerData1.getLocalData().setMsgRateIn(10000); + brokerData1.getLocalData().setMsgRateOut(10000); + brokerData2.getLocalData().setMsgRateIn(1000); + brokerData2.getLocalData().setMsgRateOut(1000); + + // pair broker3 and broker4 + brokerData3.getLocalData().setCpu(new ResourceUsage(75, 100)); + brokerData3.getLocalData().setMsgRateIn(10000); + brokerData3.getLocalData().setMsgRateOut(10000); + brokerData4.getLocalData().setCpu(new ResourceUsage(35, 100)); + brokerData4.getLocalData().setMsgRateIn(1000); + brokerData4.getLocalData().setMsgRateOut(1000); + + // expect to shed bundles with message rate(in+out) ((10000+10000)-(1000+1000))/2 = 9000 + // each bundle with 450 msg rate in and 450 msg rate out + // so 9000/(450+450)=10 bundles will be shed + for (int i = 0; i < 11; i++) { + brokerData1.getLocalData().getBundles().add("bundle1-" + i); + brokerData3.getLocalData().getBundles().add("bundle3-" + i); + + BundleData bundle = new BundleData(); + TimeAverageMessageData timeAverageMessageData = new TimeAverageMessageData(); + timeAverageMessageData.setMsgRateIn(450); + timeAverageMessageData.setMsgRateOut(450); + // as AvgShedder map BundleData to broker, the hashCode of different BundleData should be different + // so we need to set some different fields to make the hashCode different + timeAverageMessageData.setNumSamples(i); + bundle.setShortTermData(timeAverageMessageData); + loadData.getBundleData().put("bundle1-" + i, bundle); + + bundle = new BundleData(); + timeAverageMessageData = new TimeAverageMessageData(); + timeAverageMessageData.setMsgRateIn(450); + timeAverageMessageData.setMsgRateOut(450); + timeAverageMessageData.setNumSamples(i+11); + bundle.setShortTermData(timeAverageMessageData); + loadData.getBundleData().put("bundle3-" + i, bundle); + } + + // do shedding for the first time, expect to shed nothing because hit count is not enough + Multimap bundlesToUnload = avgShedder.findBundlesForUnloading(loadData, conf); + assertEquals(bundlesToUnload.size(), 0); + + // do shedding for the second time, expect to shed 10*2=20 bundles + bundlesToUnload = avgShedder.findBundlesForUnloading(loadData, conf); + assertEquals(bundlesToUnload.size(), 20); + + // assert that half of the bundles are shed from broker1, and the other half are shed from broker3 + for (String broker : bundlesToUnload.keys()) { + if (broker.equals("broker1")) { + assertEquals(bundlesToUnload.get(broker).size(), 10); + } else if (broker.equals("broker3")) { + assertEquals(bundlesToUnload.get(broker).size(), 10); + } else { + fail(); + } + } + + // assert that all the bundles from broker1 are shed to broker2, and all the bundles from broker3 are shed to broker4 + for (String bundle : bundlesToUnload.values()) { + BundleData bundleData = loadData.getBundleData().get(bundle); + if (bundle.startsWith("bundle1-")) { + assertEquals(avgShedder.selectBroker(loadData.getBrokerData().keySet(), bundleData, loadData, conf).get(), "broker2"); + } else if (bundle.startsWith("bundle3-")) { + assertEquals(avgShedder.selectBroker(loadData.getBrokerData().keySet(), bundleData, loadData, conf).get(), "broker4"); + } else { + fail(); + } + } + } } From 0ac2b9955c2e89bdb532c796541c987e75dc484b Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 28 Jun 2024 17:03:03 +0800 Subject: [PATCH 09/30] fix checkstyle. --- .../java/org/apache/pulsar/broker/ServiceConfiguration.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index e6e78d14260df..aba3ad3a669f5 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2438,8 +2438,8 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece dynamic = true, category = CATEGORY_LOAD_BALANCER, doc = "In the UniformLoadShedder and AvgShedder strategy, the maximum unload ratio." - + "For AvgShedder, recommend to set to 0.5, so that it will distribute the load evenly" + - " between the highest and lowest brokers." + + "For AvgShedder, recommend to set to 0.5, so that it will distribute the load " + + "evenly between the highest and lowest brokers." ) private double maxUnloadPercentage = 0.2; From 1bf6346fe51472659d5e49e32356ca4a9cf9a8d0 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 28 Jun 2024 17:13:54 +0800 Subject: [PATCH 10/30] fix header. --- .../org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java index 483f053afd5bf..de5a42421f469 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information From d2dbf6cca0833b663f37c1efa0282efd6ef9f8d9 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 28 Jun 2024 17:49:05 +0800 Subject: [PATCH 11/30] fix checkstyle. --- .../org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java index de5a42421f469..fe76ba8b213c8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java @@ -22,6 +22,8 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; import com.google.common.hash.Hashing; + +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -285,7 +287,7 @@ public Optional selectBroker(Set candidates, BundleData bundleTo } private static long hash(String key) { - return Hashing.crc32().hashString(key, Charsets.UTF_8).padToLong(); + return Hashing.crc32().hashString(key, StandardCharsets.UTF_8).padToLong(); } private static String getExpectedBroker(Collection brokers, BundleData bundle) { From 1b60887f57ab7e254b814bc057ffa77a7d4046c8 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 28 Jun 2024 18:06:05 +0800 Subject: [PATCH 12/30] fix checkstyle. --- .../org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java index fe76ba8b213c8..74d0cad043772 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java @@ -18,11 +18,9 @@ */ package org.apache.pulsar.broker.loadbalance.impl; -import com.google.common.base.Charsets; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; import com.google.common.hash.Hashing; - import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; From c2f6123d1577849bad7f41471ef79c311097f4af Mon Sep 17 00:00:00 2001 From: Wenzhi Feng <52550727+thetumbled@users.noreply.github.com> Date: Fri, 5 Jul 2024 18:01:23 +0800 Subject: [PATCH 13/30] Update pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java Co-authored-by: Kai Wang --- .../apache/pulsar/broker/loadbalance/impl/AvgShedder.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java index 74d0cad043772..72faf107b1925 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java @@ -254,13 +254,12 @@ public void onActiveBrokersChange(Set activeBrokers) { LoadSheddingStrategy.super.onActiveBrokersChange(activeBrokers); } - private Double calculateScores(String broker, LocalBrokerData localBrokerData, final ServiceConfiguration conf) { - double score = localBrokerData.getMaxResourceUsageWithWeight( + private Double calculateScores(LocalBrokerData localBrokerData, final ServiceConfiguration conf) { + return localBrokerData.getMaxResourceUsageWithWeight( conf.getLoadBalancerCPUResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(), conf.getLoadBalancerBandwithInResourceWeight(), conf.getLoadBalancerBandwithOutResourceWeight()) * 100; - return score; } @Override From 00e5625b8a2f819a3d0e40c07847e87ede535523 Mon Sep 17 00:00:00 2001 From: Wenzhi Feng <52550727+thetumbled@users.noreply.github.com> Date: Fri, 5 Jul 2024 18:01:36 +0800 Subject: [PATCH 14/30] Update pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java Co-authored-by: Kai Wang --- .../org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java index 72faf107b1925..ef0f46984d4ad 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java @@ -205,7 +205,7 @@ public Multimap findBundlesForUnloading(LoadData loadData, Servi log.warn("HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. " + "No Load Shedding will be done on this broker", maxLocalBrokerData.getBundles().iterator().next(), pair.getRight()); - } else if (maxLocalBrokerData.getBundles().size() == 0) { + } else if (maxLocalBrokerData.getBundles().isEmpty()) { log.warn("Broker {} is overloaded despite having no bundles", pair.getRight()); } From 1c5afe931537f8d14ff4acee6125f5157f2c673b Mon Sep 17 00:00:00 2001 From: Wenzhi Feng <52550727+thetumbled@users.noreply.github.com> Date: Fri, 5 Jul 2024 18:01:56 +0800 Subject: [PATCH 15/30] Update pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java Co-authored-by: Kai Wang --- .../org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java index ef0f46984d4ad..ec5e8b4e7d98f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java @@ -100,7 +100,7 @@ public Multimap findBundlesForUnloading(LoadData loadData, Servi } // sort brokers by scores. - Collections.sort(brokers, (e1, e2) -> (int) (brokerScoreMap.get(e1) - brokerScoreMap.get(e2))); + brokers.sort((e1, e2) -> (int) (brokerScoreMap.get(e1) - brokerScoreMap.get(e2))); if (log.isDebugEnabled()) { log.debug("sorted broker list:{}", brokers); } From 4f4ae3e0b4d6e274a73f5de4b82c94d80ceb20c6 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 5 Jul 2024 18:06:26 +0800 Subject: [PATCH 16/30] improve. --- .../broker/loadbalance/impl/AvgShedder.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java index ec5e8b4e7d98f..ef0cf07cef490 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java @@ -32,6 +32,8 @@ import java.util.Optional; import java.util.Random; import java.util.Set; + +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.mutable.MutableDouble; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.ServiceConfiguration; @@ -45,9 +47,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@Slf4j public class AvgShedder implements LoadSheddingStrategy, ModularLoadManagerStrategy { - private static Logger log = LoggerFactory.getLogger(AvgShedder.class); - // map bundle to broker. private final Map bundleBrokerMap = new HashMap<>(); // map broker to Scores. scores:0-100 @@ -84,14 +85,14 @@ public Multimap findBundlesForUnloading(LoadData loadData, Servi minMsgThreshold, minThroughputThreshold); } - List brokers = new LinkedList<>(); + List brokers = new ArrayList<>(loadData.getBrokerData().size()); brokerScoreMap.clear(); // calculate scores of brokers. for (Map.Entry entry : loadData.getBrokerData().entrySet()) { LocalBrokerData localBrokerData = entry.getValue().getLocalData(); String broker = entry.getKey(); brokers.add(broker); - Double score = calculateScores(broker, localBrokerData, conf); + Double score = calculateScores(localBrokerData, conf); brokerScoreMap.put(broker, score); // collect data to analyze the relations between scores and throughput and messageRate. log.info("broker:{}, scores:{}, throughput:{}, messageRate:{}", broker, score, @@ -283,10 +284,6 @@ public Optional selectBroker(Set candidates, BundleData bundleTo } } - private static long hash(String key) { - return Hashing.crc32().hashString(key, StandardCharsets.UTF_8).padToLong(); - } - private static String getExpectedBroker(Collection brokers, BundleData bundle) { List sortedBrokers = new ArrayList<>(brokers); Collections.sort(sortedBrokers); @@ -295,7 +292,8 @@ private static String getExpectedBroker(Collection brokers, BundleData b // use random number as input of hashing function to avoid special case that, // if there is 4 brokers running in the cluster,and add broker5,and shutdown broker3, // then all bundles belonging to broker3 will be loaded on the same broker. - final long hashcode = hash(String.valueOf(random.nextInt())); + final long hashcode = Hashing.crc32().hashString(String.valueOf(random.nextInt()), + StandardCharsets.UTF_8).padToLong(); final int index = (int) (Math.abs(hashcode) % sortedBrokers.size()); if (log.isDebugEnabled()) { log.debug("Assignment details: brokers={}, bundle={}, hashcode={}, index={}", From 64de192c6e4be0ae78290f035dc6f8cf85aa2dbf Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 5 Jul 2024 18:07:28 +0800 Subject: [PATCH 17/30] fix check style. --- .../org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java index ef0cf07cef490..4f683f77fb6a5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java @@ -44,8 +44,6 @@ import org.apache.pulsar.policies.data.loadbalancer.BundleData; import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @Slf4j public class AvgShedder implements LoadSheddingStrategy, ModularLoadManagerStrategy { From bec01e6e1c77900301c4e2afa181d76ae05b748e Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 5 Jul 2024 23:28:11 +0800 Subject: [PATCH 18/30] throw exception. --- .../broker/loadbalance/impl/ModularLoadManagerImpl.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 957a2b181a50d..8f095b7d84df8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -271,6 +271,15 @@ public void initialize(final PulsarService pulsar) { }); if (placementStrategy instanceof LoadSheddingStrategy) { + // if the placement strategy is also a load shedding strategy + // we need to check two strategies are the same + if (!conf.getLoadBalancerLoadSheddingStrategy().equals( + conf.getLoadBalancerPlacementStrategy())) { + throw new IllegalArgumentException("The load shedding strategy: " + + conf.getLoadBalancerLoadSheddingStrategy() + + " can't work with the placement strategy: " + + conf.getLoadBalancerPlacementStrategy()); + } // bind the load shedding strategy and the placement strategy loadSheddingStrategy = (LoadSheddingStrategy) placementStrategy; } else { From e438be490a5d1db1c90afce3a305bbe7a552c56f Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Sat, 6 Jul 2024 10:35:15 +0800 Subject: [PATCH 19/30] fix checkstyle. --- .../org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java index 4f683f77fb6a5..2f64ea7f59c2e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java @@ -32,7 +32,6 @@ import java.util.Optional; import java.util.Random; import java.util.Set; - import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.mutable.MutableDouble; import org.apache.commons.lang3.tuple.Pair; From 4cfbbda050220f68010ca041dffc6d0df571c42b Mon Sep 17 00:00:00 2001 From: Wenzhi Feng <52550727+thetumbled@users.noreply.github.com> Date: Mon, 8 Jul 2024 16:47:14 +0800 Subject: [PATCH 20/30] Update pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java Co-authored-by: Yunze Xu --- .../org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java index 2f64ea7f59c2e..e53c866649658 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java @@ -60,9 +60,6 @@ public class AvgShedder implements LoadSheddingStrategy, ModularLoadManagerStrat private static final double MB = 1024 * 1024; private static final Random random = new Random(); - public AvgShedder() { - } - @Override public Multimap findBundlesForUnloading(LoadData loadData, ServiceConfiguration conf) { selectedBundlesCache.clear(); From 5eac740942bf0d97c882ddc4224ebfbf2ffad30f Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Mon, 8 Jul 2024 16:52:47 +0800 Subject: [PATCH 21/30] fix deprecated conf. --- .../org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java index e53c866649658..55a9ceb09f98b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java @@ -253,8 +253,8 @@ private Double calculateScores(LocalBrokerData localBrokerData, final ServiceCon return localBrokerData.getMaxResourceUsageWithWeight( conf.getLoadBalancerCPUResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(), - conf.getLoadBalancerBandwithInResourceWeight(), - conf.getLoadBalancerBandwithOutResourceWeight()) * 100; + conf.getLoadBalancerBandwidthInResourceWeight(), + conf.getLoadBalancerBandwidthOutResourceWeight()) * 100; } @Override From eca4027e387f98df7c7c67aad4d41770bcd7497c Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Mon, 8 Jul 2024 16:54:14 +0800 Subject: [PATCH 22/30] improve log. --- .../pulsar/broker/loadbalance/impl/AvgShedder.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java index 55a9ceb09f98b..ab9048d3343f9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java @@ -88,17 +88,16 @@ public Multimap findBundlesForUnloading(LoadData loadData, Servi brokers.add(broker); Double score = calculateScores(localBrokerData, conf); brokerScoreMap.put(broker, score); - // collect data to analyze the relations between scores and throughput and messageRate. - log.info("broker:{}, scores:{}, throughput:{}, messageRate:{}", broker, score, - localBrokerData.getMsgThroughputIn() + localBrokerData.getMsgThroughputOut(), - localBrokerData.getMsgRateIn() + localBrokerData.getMsgRateOut()); + if (log.isDebugEnabled()) { + log.info("broker:{}, scores:{}, throughput:{}, messageRate:{}", broker, score, + localBrokerData.getMsgThroughputIn() + localBrokerData.getMsgThroughputOut(), + localBrokerData.getMsgRateIn() + localBrokerData.getMsgRateOut()); + } } // sort brokers by scores. brokers.sort((e1, e2) -> (int) (brokerScoreMap.get(e1) - brokerScoreMap.get(e2))); - if (log.isDebugEnabled()) { - log.debug("sorted broker list:{}", brokers); - } + log.info("sorted broker list:{}", brokers); // find broker pairs for shedding. List> pairs = new LinkedList<>(); From 2fddb7745f0b9fa1d5fceee9d585c0e025f8b0f0 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Mon, 8 Jul 2024 18:19:48 +0800 Subject: [PATCH 23/30] refactor to some sub methods. --- .../broker/loadbalance/impl/AvgShedder.java | 261 +++++++++--------- 1 file changed, 136 insertions(+), 125 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java index ab9048d3343f9..b02923e1c8e3b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java @@ -66,8 +66,6 @@ public Multimap findBundlesForUnloading(LoadData loadData, Servi final double minThroughputThreshold = conf.getMinUnloadMessageThroughput(); final double minMsgThreshold = conf.getMinUnloadMessage(); final double maxUnloadPercentage = conf.getMaxUnloadPercentage(); - - final Map recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles(); final double lowThreshold = conf.getLoadBalancerAvgShedderLowThreshold(); final double highThreshold = conf.getLoadBalancerAvgShedderHighThreshold(); final int hitCountHighThreshold = conf.getLoadBalancerAvgShedderHitCountHighThreshold(); @@ -80,63 +78,15 @@ public Multimap findBundlesForUnloading(LoadData loadData, Servi } List brokers = new ArrayList<>(loadData.getBrokerData().size()); - brokerScoreMap.clear(); - // calculate scores of brokers. - for (Map.Entry entry : loadData.getBrokerData().entrySet()) { - LocalBrokerData localBrokerData = entry.getValue().getLocalData(); - String broker = entry.getKey(); - brokers.add(broker); - Double score = calculateScores(localBrokerData, conf); - brokerScoreMap.put(broker, score); - if (log.isDebugEnabled()) { - log.info("broker:{}, scores:{}, throughput:{}, messageRate:{}", broker, score, - localBrokerData.getMsgThroughputIn() + localBrokerData.getMsgThroughputOut(), - localBrokerData.getMsgRateIn() + localBrokerData.getMsgRateOut()); - } - } - - // sort brokers by scores. - brokers.sort((e1, e2) -> (int) (brokerScoreMap.get(e1) - brokerScoreMap.get(e2))); + calculateScoresAndSort(loadData, conf, brokers); log.info("sorted broker list:{}", brokers); // find broker pairs for shedding. - List> pairs = new LinkedList<>(); - int i = 0, j = brokers.size() - 1; - while (i <= j) { - String maxBroker = brokers.get(j); - String minBroker = brokers.get(i); - if (brokerScoreMap.get(maxBroker) - brokerScoreMap.get(minBroker) < lowThreshold) { - brokerHitCountForHigh.remove(maxBroker); - brokerHitCountForHigh.remove(minBroker); - - brokerHitCountForLow.remove(maxBroker); - brokerHitCountForLow.remove(minBroker); - } else { - pairs.add(Pair.of(minBroker, maxBroker)); - if (brokerScoreMap.get(maxBroker) - brokerScoreMap.get(minBroker) < highThreshold) { - brokerHitCountForLow.put(minBroker, brokerHitCountForLow.getOrDefault(minBroker, 0) + 1); - brokerHitCountForLow.put(maxBroker, brokerHitCountForLow.getOrDefault(maxBroker, 0) + 1); - - brokerHitCountForHigh.remove(maxBroker); - brokerHitCountForHigh.remove(minBroker); - } else { - brokerHitCountForLow.put(minBroker, brokerHitCountForLow.getOrDefault(minBroker, 0) + 1); - brokerHitCountForLow.put(maxBroker, brokerHitCountForLow.getOrDefault(maxBroker, 0) + 1); - - brokerHitCountForHigh.put(minBroker, brokerHitCountForHigh.getOrDefault(minBroker, 0) + 1); - brokerHitCountForHigh.put(maxBroker, brokerHitCountForHigh.getOrDefault(maxBroker, 0) + 1); - } - } - i++; - j--; - } - - log.info("brokerHitCountForHigh:{}, brokerHitCountForLow:{}", - brokerHitCountForHigh, brokerHitCountForLow); - + List> pairs = findBrokerPairs(brokers, lowThreshold, highThreshold); + log.info("brokerHitCountForHigh:{}, brokerHitCountForLow:{}", brokerHitCountForHigh, brokerHitCountForLow); if (pairs.isEmpty()) { if (log.isDebugEnabled()) { - log.debug("there is no any overload broker.no need to shedding bundles."); + log.debug("there is no any overload broker, no need to shedding bundles."); } brokerHitCountForHigh.clear(); brokerHitCountForLow.clear(); @@ -159,88 +109,94 @@ public Multimap findBundlesForUnloading(LoadData loadData, Servi // if hit, remove entry. brokerHitCountForHigh.remove(underloadedBroker); brokerHitCountForHigh.remove(overloadedBroker); - brokerHitCountForLow.remove(underloadedBroker); brokerHitCountForLow.remove(overloadedBroker); - // calculate how much throughput to unload. - LocalBrokerData minLocalBrokerData = loadData.getBrokerData().get(underloadedBroker).getLocalData(); - LocalBrokerData maxLocalBrokerData = loadData.getBrokerData().get(overloadedBroker).getLocalData(); - - double minMsgRate = minLocalBrokerData.getMsgRateIn() + minLocalBrokerData.getMsgRateOut(); - double maxMsgRate = maxLocalBrokerData.getMsgRateIn() + maxLocalBrokerData.getMsgRateOut(); + // select bundle for unloading. + selectBundleForUnloading(loadData, overloadedBroker, underloadedBroker, minThroughputThreshold, + minMsgThreshold, maxUnloadPercentage); + } + return selectedBundlesCache; + } - double minThroughput = minLocalBrokerData.getMsgThroughputIn() + minLocalBrokerData.getMsgThroughputOut(); - double maxThroughput = maxLocalBrokerData.getMsgThroughputIn() + maxLocalBrokerData.getMsgThroughputOut(); + private void selectBundleForUnloading(LoadData loadData, String overloadedBroker, String underloadedBroker, + double minThroughputThreshold, double minMsgThreshold, double maxUnloadPercentage) { + // calculate how much throughput to unload. + LocalBrokerData minLocalBrokerData = loadData.getBrokerData().get(underloadedBroker).getLocalData(); + LocalBrokerData maxLocalBrokerData = loadData.getBrokerData().get(overloadedBroker).getLocalData(); - double msgRequiredFromUnloadedBundles = (maxMsgRate - minMsgRate) * maxUnloadPercentage; - double throughputRequiredFromUnloadedBundles = (maxThroughput - minThroughput) * maxUnloadPercentage; + double minMsgRate = minLocalBrokerData.getMsgRateIn() + minLocalBrokerData.getMsgRateOut(); + double maxMsgRate = maxLocalBrokerData.getMsgRateIn() + maxLocalBrokerData.getMsgRateOut(); - boolean isMsgRateToOffload; - MutableDouble trafficMarkedToOffload = new MutableDouble(0); + double minThroughput = minLocalBrokerData.getMsgThroughputIn() + minLocalBrokerData.getMsgThroughputOut(); + double maxThroughput = maxLocalBrokerData.getMsgThroughputIn() + maxLocalBrokerData.getMsgThroughputOut(); - if (msgRequiredFromUnloadedBundles > minMsgThreshold) { - isMsgRateToOffload = true; - trafficMarkedToOffload.setValue(msgRequiredFromUnloadedBundles); - } else if (throughputRequiredFromUnloadedBundles > minThroughputThreshold) { - isMsgRateToOffload = false; - trafficMarkedToOffload.setValue(throughputRequiredFromUnloadedBundles); - } else { - log.info( - "broker:[{}] is planning to shed bundles to broker:[{}],but the throughput {} MByte/s is " - + "less than minimumThroughputThreshold {} MByte/s, and the msgRate {} rate/s" - + " is also less than minimumMsgRateThreshold {} rate/s, skipping bundle unload.", - overloadedBroker, underloadedBroker, throughputRequiredFromUnloadedBundles / MB, - minThroughputThreshold / MB, msgRequiredFromUnloadedBundles, minMsgThreshold); - continue; - } + double msgRequiredFromUnloadedBundles = (maxMsgRate - minMsgRate) * maxUnloadPercentage; + double throughputRequiredFromUnloadedBundles = (maxThroughput - minThroughput) * maxUnloadPercentage; - if (maxLocalBrokerData.getBundles().size() == 1) { - log.warn("HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. " - + "No Load Shedding will be done on this broker", - maxLocalBrokerData.getBundles().iterator().next(), pair.getRight()); - } else if (maxLocalBrokerData.getBundles().isEmpty()) { - log.warn("Broker {} is overloaded despite having no bundles", pair.getRight()); - } + boolean isMsgRateToOffload; + MutableDouble trafficMarkedToOffload = new MutableDouble(0); - // do shedding + if (msgRequiredFromUnloadedBundles > minMsgThreshold) { + isMsgRateToOffload = true; + trafficMarkedToOffload.setValue(msgRequiredFromUnloadedBundles); + } else if (throughputRequiredFromUnloadedBundles > minThroughputThreshold) { + isMsgRateToOffload = false; + trafficMarkedToOffload.setValue(throughputRequiredFromUnloadedBundles); + } else { log.info( - "broker:[{}] is planning to shed bundles to broker:[{}]. " - + "maxBroker stat:scores:{}, throughput:{}, msgRate:{}. " - + "minBroker stat:scores:{}, throughput:{}, msgRate:{}. " - + "isMsgRateToOffload:{}, trafficMarkedToOffload:{}", - overloadedBroker, underloadedBroker, brokerScoreMap.get(overloadedBroker), maxThroughput, - maxMsgRate, brokerScoreMap.get(underloadedBroker), minThroughput, minMsgRate, - isMsgRateToOffload, trafficMarkedToOffload); - - loadData.getBundleDataForLoadShedding().entrySet().stream().filter(e -> - maxLocalBrokerData.getBundles().contains(e.getKey()) - ).filter(e -> - !recentlyUnloadedBundles.containsKey(e.getKey()) - ).map((e) -> { - BundleData bundleData = e.getValue(); - TimeAverageMessageData shortTermData = bundleData.getShortTermData(); - double traffic = isMsgRateToOffload - ? shortTermData.getMsgRateIn() + shortTermData.getMsgRateOut() - : shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut(); - return Pair.of(e, traffic); - }).sorted((e1, e2) -> - Double.compare(e2.getRight(), e1.getRight()) - ).forEach(e -> { - Map.Entry bundle = e.getLeft(); - double traffic = e.getRight(); - if (traffic > 0 && traffic <= trafficMarkedToOffload.getValue()) { - selectedBundlesCache.put(overloadedBroker, bundle.getKey()); - bundleBrokerMap.put(bundle.getValue(), underloadedBroker); - trafficMarkedToOffload.add(-traffic); - if (log.isDebugEnabled()) { - log.debug("Found bundle to unload:{}, isMsgRateToOffload:{}, traffic:{}", - bundle, isMsgRateToOffload, traffic); - } - } - }); + "broker:[{}] is planning to shed bundles to broker:[{}],but the throughput {} MByte/s is " + + "less than minimumThroughputThreshold {} MByte/s, and the msgRate {} rate/s" + + " is also less than minimumMsgRateThreshold {} rate/s, skipping bundle unload.", + overloadedBroker, underloadedBroker, throughputRequiredFromUnloadedBundles / MB, + minThroughputThreshold / MB, msgRequiredFromUnloadedBundles, minMsgThreshold); + return; } - return selectedBundlesCache; + + if (maxLocalBrokerData.getBundles().size() == 1) { + log.warn("HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. " + + "No Load Shedding will be done on this broker", + maxLocalBrokerData.getBundles().iterator().next(), overloadedBroker); + } else if (maxLocalBrokerData.getBundles().isEmpty()) { + log.warn("Broker {} is overloaded despite having no bundles", overloadedBroker); + } + + // do shedding + log.info( + "broker:[{}] is planning to shed bundles to broker:[{}]. " + + "maxBroker stat:scores:{}, throughput:{}, msgRate:{}. " + + "minBroker stat:scores:{}, throughput:{}, msgRate:{}. " + + "isMsgRateToOffload:{}, trafficMarkedToOffload:{}", + overloadedBroker, underloadedBroker, brokerScoreMap.get(overloadedBroker), maxThroughput, + maxMsgRate, brokerScoreMap.get(underloadedBroker), minThroughput, minMsgRate, + isMsgRateToOffload, trafficMarkedToOffload); + + loadData.getBundleDataForLoadShedding().entrySet().stream().filter(e -> + maxLocalBrokerData.getBundles().contains(e.getKey()) + ).filter(e -> + !loadData.getRecentlyUnloadedBundles().containsKey(e.getKey()) + ).map((e) -> { + BundleData bundleData = e.getValue(); + TimeAverageMessageData shortTermData = bundleData.getShortTermData(); + double traffic = isMsgRateToOffload + ? shortTermData.getMsgRateIn() + shortTermData.getMsgRateOut() + : shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut(); + return Pair.of(e, traffic); + }).sorted((e1, e2) -> + Double.compare(e2.getRight(), e1.getRight()) + ).forEach(e -> { + Map.Entry bundle = e.getLeft(); + double traffic = e.getRight(); + if (traffic > 0 && traffic <= trafficMarkedToOffload.getValue()) { + selectedBundlesCache.put(overloadedBroker, bundle.getKey()); + bundleBrokerMap.put(bundle.getValue(), underloadedBroker); + trafficMarkedToOffload.add(-traffic); + if (log.isDebugEnabled()) { + log.debug("Found bundle to unload:{}, isMsgRateToOffload:{}, traffic:{}", + bundle, isMsgRateToOffload, traffic); + } + } + }); } @Override @@ -248,6 +204,27 @@ public void onActiveBrokersChange(Set activeBrokers) { LoadSheddingStrategy.super.onActiveBrokersChange(activeBrokers); } + private void calculateScoresAndSort(LoadData loadData, ServiceConfiguration conf, List brokers) { + brokerScoreMap.clear(); + + // calculate scores of brokers. + for (Map.Entry entry : loadData.getBrokerData().entrySet()) { + LocalBrokerData localBrokerData = entry.getValue().getLocalData(); + String broker = entry.getKey(); + brokers.add(broker); + Double score = calculateScores(localBrokerData, conf); + brokerScoreMap.put(broker, score); + if (log.isDebugEnabled()) { + log.info("broker:{}, scores:{}, throughput:{}, messageRate:{}", broker, score, + localBrokerData.getMsgThroughputIn() + localBrokerData.getMsgThroughputOut(), + localBrokerData.getMsgRateIn() + localBrokerData.getMsgRateOut()); + } + } + + // sort brokers by scores. + brokers.sort((e1, e2) -> (int) (brokerScoreMap.get(e1) - brokerScoreMap.get(e2))); + } + private Double calculateScores(LocalBrokerData localBrokerData, final ServiceConfiguration conf) { return localBrokerData.getMaxResourceUsageWithWeight( conf.getLoadBalancerCPUResourceWeight(), @@ -256,6 +233,40 @@ private Double calculateScores(LocalBrokerData localBrokerData, final ServiceCon conf.getLoadBalancerBandwidthOutResourceWeight()) * 100; } + private List> findBrokerPairs(List brokers, double lowThreshold, double highThreshold) { + List> pairs = new LinkedList<>(); + int i = 0, j = brokers.size() - 1; + while (i <= j) { + String maxBroker = brokers.get(j); + String minBroker = brokers.get(i); + if (brokerScoreMap.get(maxBroker) - brokerScoreMap.get(minBroker) < lowThreshold) { + brokerHitCountForHigh.remove(maxBroker); + brokerHitCountForHigh.remove(minBroker); + + brokerHitCountForLow.remove(maxBroker); + brokerHitCountForLow.remove(minBroker); + } else { + pairs.add(Pair.of(minBroker, maxBroker)); + if (brokerScoreMap.get(maxBroker) - brokerScoreMap.get(minBroker) < highThreshold) { + brokerHitCountForLow.put(minBroker, brokerHitCountForLow.getOrDefault(minBroker, 0) + 1); + brokerHitCountForLow.put(maxBroker, brokerHitCountForLow.getOrDefault(maxBroker, 0) + 1); + + brokerHitCountForHigh.remove(maxBroker); + brokerHitCountForHigh.remove(minBroker); + } else { + brokerHitCountForLow.put(minBroker, brokerHitCountForLow.getOrDefault(minBroker, 0) + 1); + brokerHitCountForLow.put(maxBroker, brokerHitCountForLow.getOrDefault(maxBroker, 0) + 1); + + brokerHitCountForHigh.put(minBroker, brokerHitCountForHigh.getOrDefault(minBroker, 0) + 1); + brokerHitCountForHigh.put(maxBroker, brokerHitCountForHigh.getOrDefault(maxBroker, 0) + 1); + } + } + i++; + j--; + } + return pairs; + } + @Override public Optional selectBroker(Set candidates, BundleData bundleToAssign, LoadData loadData, ServiceConfiguration conf) { From f731a25894d52239e48270762b6d6e879373094c Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Mon, 8 Jul 2024 18:31:26 +0800 Subject: [PATCH 24/30] refactor map. --- .../broker/loadbalance/impl/AvgShedder.java | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java index b02923e1c8e3b..6594d60542966 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java @@ -34,6 +34,7 @@ import java.util.Set; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.mutable.MutableDouble; +import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.LoadData; @@ -51,8 +52,8 @@ public class AvgShedder implements LoadSheddingStrategy, ModularLoadManagerStrat // map broker to Scores. scores:0-100 private final Map brokerScoreMap = new HashMap<>(); // map broker hit count for high threshold/low threshold - private final Map brokerHitCountForHigh = new HashMap<>(); - private final Map brokerHitCountForLow = new HashMap<>(); + private final Map brokerHitCountForHigh = new HashMap<>(); + private final Map brokerHitCountForLow = new HashMap<>(); // result returned by shedding, map broker to bundles. private final Multimap selectedBundlesCache = ArrayListMultimap.create(); @@ -99,10 +100,14 @@ public Multimap findBundlesForUnloading(LoadData loadData, Servi String underloadedBroker = pair.getLeft(); // check hit count for high threshold and low threshold. - if (!(brokerHitCountForHigh.getOrDefault(underloadedBroker, 0) >= hitCountHighThreshold) - && !(brokerHitCountForHigh.getOrDefault(overloadedBroker, 0) >= hitCountHighThreshold) - && !(brokerHitCountForLow.getOrDefault(underloadedBroker, 0) >= hitCountLowThreshold) - && !(brokerHitCountForLow.getOrDefault(overloadedBroker, 0) >= hitCountLowThreshold)) { + if (!(brokerHitCountForHigh.computeIfAbsent(underloadedBroker, __ -> new MutableInt(0)) + .intValue() >= hitCountHighThreshold) + && !(brokerHitCountForHigh.computeIfAbsent(overloadedBroker, __ -> new MutableInt(0)) + .intValue() >= hitCountHighThreshold) + && !(brokerHitCountForLow.computeIfAbsent(underloadedBroker, __ -> new MutableInt(0)) + .intValue() >= hitCountLowThreshold) + && !(brokerHitCountForLow.computeIfAbsent(overloadedBroker, __ -> new MutableInt(0)) + .intValue() >= hitCountLowThreshold)) { continue; } @@ -248,17 +253,17 @@ private List> findBrokerPairs(List brokers, double } else { pairs.add(Pair.of(minBroker, maxBroker)); if (brokerScoreMap.get(maxBroker) - brokerScoreMap.get(minBroker) < highThreshold) { - brokerHitCountForLow.put(minBroker, brokerHitCountForLow.getOrDefault(minBroker, 0) + 1); - brokerHitCountForLow.put(maxBroker, brokerHitCountForLow.getOrDefault(maxBroker, 0) + 1); + brokerHitCountForLow.computeIfAbsent(minBroker, k -> new MutableInt(0)).increment(); + brokerHitCountForLow.computeIfAbsent(maxBroker, k -> new MutableInt(0)).increment(); brokerHitCountForHigh.remove(maxBroker); brokerHitCountForHigh.remove(minBroker); } else { - brokerHitCountForLow.put(minBroker, brokerHitCountForLow.getOrDefault(minBroker, 0) + 1); - brokerHitCountForLow.put(maxBroker, brokerHitCountForLow.getOrDefault(maxBroker, 0) + 1); + brokerHitCountForLow.computeIfAbsent(minBroker, k -> new MutableInt(0)).increment(); + brokerHitCountForLow.computeIfAbsent(maxBroker, k -> new MutableInt(0)).increment(); - brokerHitCountForHigh.put(minBroker, brokerHitCountForHigh.getOrDefault(minBroker, 0) + 1); - brokerHitCountForHigh.put(maxBroker, brokerHitCountForHigh.getOrDefault(maxBroker, 0) + 1); + brokerHitCountForHigh.computeIfAbsent(minBroker, k -> new MutableInt(0)).increment(); + brokerHitCountForHigh.computeIfAbsent(maxBroker, k -> new MutableInt(0)).increment(); } } i++; From 11d56c84341e5d5161a2fddf5c5270575b91d39a Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Wed, 10 Jul 2024 15:45:57 +0800 Subject: [PATCH 25/30] fix check style. --- .../apache/pulsar/broker/loadbalance/impl/AvgShedder.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java index 6594d60542966..4cc46c152c84f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java @@ -125,7 +125,8 @@ public Multimap findBundlesForUnloading(LoadData loadData, Servi } private void selectBundleForUnloading(LoadData loadData, String overloadedBroker, String underloadedBroker, - double minThroughputThreshold, double minMsgThreshold, double maxUnloadPercentage) { + double minThroughputThreshold, double minMsgThreshold, + double maxUnloadPercentage) { // calculate how much throughput to unload. LocalBrokerData minLocalBrokerData = loadData.getBrokerData().get(underloadedBroker).getLocalData(); LocalBrokerData maxLocalBrokerData = loadData.getBrokerData().get(overloadedBroker).getLocalData(); @@ -238,7 +239,8 @@ private Double calculateScores(LocalBrokerData localBrokerData, final ServiceCon conf.getLoadBalancerBandwidthOutResourceWeight()) * 100; } - private List> findBrokerPairs(List brokers, double lowThreshold, double highThreshold) { + private List> findBrokerPairs(List brokers, + double lowThreshold, double highThreshold) { List> pairs = new LinkedList<>(); int i = 0, j = brokers.size() - 1; while (i <= j) { From 0fc8bc51d12acecf81e0f6051d658d52212d4725 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Thu, 11 Jul 2024 14:17:58 +0800 Subject: [PATCH 26/30] avoid create list manually. --- .../pulsar/broker/loadbalance/impl/AvgShedder.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java index 4cc46c152c84f..746addfacdb23 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java @@ -78,8 +78,7 @@ public Multimap findBundlesForUnloading(LoadData loadData, Servi minMsgThreshold, minThroughputThreshold); } - List brokers = new ArrayList<>(loadData.getBrokerData().size()); - calculateScoresAndSort(loadData, conf, brokers); + List brokers = calculateScoresAndSort(loadData, conf); log.info("sorted broker list:{}", brokers); // find broker pairs for shedding. @@ -210,14 +209,13 @@ public void onActiveBrokersChange(Set activeBrokers) { LoadSheddingStrategy.super.onActiveBrokersChange(activeBrokers); } - private void calculateScoresAndSort(LoadData loadData, ServiceConfiguration conf, List brokers) { + private List calculateScoresAndSort(LoadData loadData, ServiceConfiguration conf) { brokerScoreMap.clear(); // calculate scores of brokers. for (Map.Entry entry : loadData.getBrokerData().entrySet()) { LocalBrokerData localBrokerData = entry.getValue().getLocalData(); String broker = entry.getKey(); - brokers.add(broker); Double score = calculateScores(localBrokerData, conf); brokerScoreMap.put(broker, score); if (log.isDebugEnabled()) { @@ -228,7 +226,8 @@ private void calculateScoresAndSort(LoadData loadData, ServiceConfiguration conf } // sort brokers by scores. - brokers.sort((e1, e2) -> (int) (brokerScoreMap.get(e1) - brokerScoreMap.get(e2))); + return brokerScoreMap.entrySet().stream().sorted((o1, o2) -> (int) (o1.getValue() - o2.getValue())) + .map(Map.Entry::getKey).toList(); } private Double calculateScores(LocalBrokerData localBrokerData, final ServiceConfiguration conf) { From 65b894195ebeee87ca07af07da1d832a8cb545cd Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Thu, 11 Jul 2024 14:27:29 +0800 Subject: [PATCH 27/30] reduce one find operation. --- .../apache/pulsar/broker/loadbalance/impl/AvgShedder.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java index 746addfacdb23..9f17d4b926b48 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java @@ -276,7 +276,8 @@ private List> findBrokerPairs(List brokers, @Override public Optional selectBroker(Set candidates, BundleData bundleToAssign, LoadData loadData, ServiceConfiguration conf) { - if (!bundleBrokerMap.containsKey(bundleToAssign) || !candidates.contains(bundleBrokerMap.get(bundleToAssign))) { + final var brokerToUnload = bundleBrokerMap.getOrDefault(bundleToAssign, null); + if (brokerToUnload == null || !candidates.contains(bundleBrokerMap.get(bundleToAssign))) { // cluster initializing or broker is shutdown if (log.isDebugEnabled()) { if (!bundleBrokerMap.containsKey(bundleToAssign)) { @@ -290,7 +291,7 @@ public Optional selectBroker(Set candidates, BundleData bundleTo bundleBrokerMap.put(bundleToAssign, broker); return Optional.of(broker); } else { - return Optional.of(bundleBrokerMap.get(bundleToAssign)); + return Optional.of(brokerToUnload); } } From dfb8765710b74444b055af4ed88d76c381eb9fb5 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Thu, 11 Jul 2024 14:37:56 +0800 Subject: [PATCH 28/30] add test to cover getExpectedBroker. --- .../ModularLoadManagerStrategyTest.java | 30 +++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java index 818af35943f36..53ddde8856c63 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.loadbalance; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; import java.lang.reflect.Field; import java.util.Arrays; @@ -48,13 +49,13 @@ @Test(groups = "broker") public class ModularLoadManagerStrategyTest { - public void testAvgShedder() throws Exception{ + public void testAvgShedderWithPreassignedBroker() throws Exception { ModularLoadManagerStrategy strategy = new AvgShedder(); Field field = AvgShedder.class.getDeclaredField("bundleBrokerMap"); field.setAccessible(true); Map bundleBrokerMap = (Map) field.get(strategy); BundleData bundleData = new BundleData(); - // assign bundle broker1 in bundleBrokerMap. + // assign bundle to broker1 in bundleBrokerMap. bundleBrokerMap.put(bundleData, "1"); assertEquals(strategy.selectBroker(Set.of("1", "2", "3"), bundleData, null, null), Optional.of("1")); assertEquals(bundleBrokerMap.get(bundleData), "1"); @@ -64,6 +65,31 @@ public void testAvgShedder() throws Exception{ assertEquals(bundleBrokerMap.get(bundleData), "2"); } + public void testAvgShedderWithoutPreassignedBroker() throws Exception { + ModularLoadManagerStrategy strategy = new AvgShedder(); + Field field = AvgShedder.class.getDeclaredField("bundleBrokerMap"); + field.setAccessible(true); + Map bundleBrokerMap = (Map) field.get(strategy); + BundleData bundleData = new BundleData(); + Set candidates = new HashSet<>(); + candidates.add("1"); + candidates.add("2"); + candidates.add("3"); + + // select broker from candidates randomly. + Optional selectedBroker = strategy.selectBroker(candidates, bundleData, null, null); + assertTrue(selectedBroker.isPresent()); + assertTrue(candidates.contains(selectedBroker.get())); + assertEquals(bundleBrokerMap.get(bundleData), selectedBroker.get()); + + // remove original broker in candidates + candidates.remove(selectedBroker.get()); + selectedBroker = strategy.selectBroker(candidates, bundleData, null, null); + assertTrue(selectedBroker.isPresent()); + assertTrue(candidates.contains(selectedBroker.get())); + assertEquals(bundleBrokerMap.get(bundleData), selectedBroker.get()); + } + // Test that least long term message rate works correctly. public void testLeastLongTermMessageRate() { BundleData bundleData = new BundleData(); From 4a0f64ccd8d2fd9730b43936ea6f7488180949dc Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Thu, 11 Jul 2024 14:39:49 +0800 Subject: [PATCH 29/30] avoid static Random object . --- .../org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java index 9f17d4b926b48..dadf263eb6e19 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java @@ -57,9 +57,7 @@ public class AvgShedder implements LoadSheddingStrategy, ModularLoadManagerStrat // result returned by shedding, map broker to bundles. private final Multimap selectedBundlesCache = ArrayListMultimap.create(); - private static final double MB = 1024 * 1024; - private static final Random random = new Random(); @Override public Multimap findBundlesForUnloading(LoadData loadData, ServiceConfiguration conf) { @@ -303,7 +301,7 @@ private static String getExpectedBroker(Collection brokers, BundleData b // use random number as input of hashing function to avoid special case that, // if there is 4 brokers running in the cluster,and add broker5,and shutdown broker3, // then all bundles belonging to broker3 will be loaded on the same broker. - final long hashcode = Hashing.crc32().hashString(String.valueOf(random.nextInt()), + final long hashcode = Hashing.crc32().hashString(String.valueOf(new Random().nextInt()), StandardCharsets.UTF_8).padToLong(); final int index = (int) (Math.abs(hashcode) % sortedBrokers.size()); if (log.isDebugEnabled()) { From 26f547906d7d94dde021f50a80ec609857afc80e Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Thu, 11 Jul 2024 14:54:16 +0800 Subject: [PATCH 30/30] make selectedBundlesCache locally. --- .../pulsar/broker/loadbalance/impl/AvgShedder.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java index dadf263eb6e19..39ff242fc6c17 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java @@ -54,14 +54,14 @@ public class AvgShedder implements LoadSheddingStrategy, ModularLoadManagerStrat // map broker hit count for high threshold/low threshold private final Map brokerHitCountForHigh = new HashMap<>(); private final Map brokerHitCountForLow = new HashMap<>(); - - // result returned by shedding, map broker to bundles. - private final Multimap selectedBundlesCache = ArrayListMultimap.create(); private static final double MB = 1024 * 1024; @Override public Multimap findBundlesForUnloading(LoadData loadData, ServiceConfiguration conf) { - selectedBundlesCache.clear(); + // result returned by shedding, map broker to bundles. + Multimap selectedBundlesCache = ArrayListMultimap.create(); + + // configuration for shedding. final double minThroughputThreshold = conf.getMinUnloadMessageThroughput(); final double minMsgThreshold = conf.getMinUnloadMessage(); final double maxUnloadPercentage = conf.getMaxUnloadPercentage(); @@ -116,14 +116,14 @@ public Multimap findBundlesForUnloading(LoadData loadData, Servi // select bundle for unloading. selectBundleForUnloading(loadData, overloadedBroker, underloadedBroker, minThroughputThreshold, - minMsgThreshold, maxUnloadPercentage); + minMsgThreshold, maxUnloadPercentage, selectedBundlesCache); } return selectedBundlesCache; } private void selectBundleForUnloading(LoadData loadData, String overloadedBroker, String underloadedBroker, double minThroughputThreshold, double minMsgThreshold, - double maxUnloadPercentage) { + double maxUnloadPercentage, Multimap selectedBundlesCache) { // calculate how much throughput to unload. LocalBrokerData minLocalBrokerData = loadData.getBrokerData().get(underloadedBroker).getLocalData(); LocalBrokerData maxLocalBrokerData = loadData.getBrokerData().get(overloadedBroker).getLocalData();