From 8efa68ebffce6c3cf685d13f4b3b9ce0bc7f6d8a Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Tue, 11 Jun 2024 11:03:15 +0800 Subject: [PATCH] make resource work for other algo. --- conf/broker.conf | 5 ---- conf/standalone.conf | 5 ---- .../impl/LeastLongTermMessageRate.java | 6 ++++- .../impl/ModularLoadManagerImpl.java | 26 ++++++++++++------- .../loadbalance/impl/OverloadShedder.java | 9 +++++-- 5 files changed, 29 insertions(+), 22 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 8fd266d609cf4..5c5d8d42817e9 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1394,19 +1394,15 @@ loadBalancerMsgThroughputMultiplierDifferenceShedderThreshold=4 loadBalancerHistoryResourcePercentage=0.9 # The BandWidthIn usage weight when calculating new resource usage. -# It only takes effect in the ThresholdShedder strategy. loadBalancerBandwidthInResourceWeight=1.0 # The BandWidthOut usage weight when calculating new resource usage. -# It only takes effect in the ThresholdShedder strategy. loadBalancerBandwidthOutResourceWeight=1.0 # The CPU usage weight when calculating new resource usage. -# It only takes effect in the ThresholdShedder strategy. loadBalancerCPUResourceWeight=1.0 # The direct memory usage weight when calculating new resource usage. -# It only takes effect in the ThresholdShedder strategy. # Direct memory usage cannot accurately reflect the machine's load, # and it is not recommended to use it to score the machine's load. loadBalancerDirectMemoryResourceWeight=0 @@ -1814,7 +1810,6 @@ strictBookieAffinityEnabled=false # These settings are left here for compatibility # The heap memory usage weight when calculating new resource usage. -# It only takes effect in the ThresholdShedder strategy. # Deprecated: Memory is no longer used as a load balancing item loadBalancerMemoryResourceWeight=0 diff --git a/conf/standalone.conf b/conf/standalone.conf index 6b261ce11c6cd..635b31ac38def 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -911,23 +911,18 @@ loadBalancerBrokerThresholdShedderPercentage=10 loadBalancerHistoryResourcePercentage=0.9 # The BandWidthIn usage weight when calculating new resource usage. -# It only takes effect in the ThresholdShedder strategy. loadBalancerBandwidthInResourceWeight=1.0 # The BandWidthOut usage weight when calculating new resource usage. -# It only takes effect in the ThresholdShedder strategy. loadBalancerBandwidthOutResourceWeight=1.0 # The CPU usage weight when calculating new resource usage. -# It only takes effect in the ThresholdShedder strategy. loadBalancerCPUResourceWeight=1.0 # The heap memory usage weight when calculating new resource usage. -# It only takes effect in the ThresholdShedder strategy. loadBalancerMemoryResourceWeight=0 # The direct memory usage weight when calculating new resource usage. -# It only takes effect in the ThresholdShedder strategy. loadBalancerDirectMemoryResourceWeight=0 # Bundle unload minimum throughput threshold (MB), avoiding bundle unload frequently. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastLongTermMessageRate.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastLongTermMessageRate.java index fe161467338ff..f51ca797f0edb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastLongTermMessageRate.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastLongTermMessageRate.java @@ -52,7 +52,11 @@ public LeastLongTermMessageRate() { // Any broker at (or above) the overload threshold will have a score of POSITIVE_INFINITY. private static double getScore(final BrokerData brokerData, final ServiceConfiguration conf) { final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0; - final double maxUsage = brokerData.getLocalData().getMaxResourceUsage(); + final double maxUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight( + conf.getLoadBalancerCPUResourceWeight(), + conf.getLoadBalancerDirectMemoryResourceWeight(), + conf.getLoadBalancerBandwidthInResourceWeight(), + conf.getLoadBalancerBandwidthOutResourceWeight()); if (maxUsage > overloadThreshold) { log.warn("Broker {} is overloaded: max usage={}", brokerData.getLocalData().getWebServiceUrl(), maxUsage); return Double.POSITIVE_INFINITY; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 5d08ea9c3c3be..c393eab5f04c3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -425,6 +425,14 @@ private double percentChange(final double oldValue, final double newValue) { return 100 * Math.abs((oldValue - newValue) / oldValue); } + private double getMaxResourceUsageWithWeight(LocalBrokerData localBrokerData, ServiceConfiguration conf) { + return localBrokerData.getMaxResourceUsageWithWeight( + conf.getLoadBalancerCPUResourceWeight(), + conf.getLoadBalancerDirectMemoryResourceWeight(), + conf.getLoadBalancerBandwidthInResourceWeight(), + conf.getLoadBalancerBandwidthOutResourceWeight()); + } + // Determine if the broker data requires an update by delegating to the update condition. private boolean needBrokerDataUpdate() { final long updateMaxIntervalMillis = TimeUnit.MINUTES @@ -437,14 +445,13 @@ private boolean needBrokerDataUpdate() { // Always update after surpassing the maximum interval. return true; } - final double maxChange = Math - .max(100.0 * (Math.abs(lastData.getMaxResourceUsage() - localData.getMaxResourceUsage())), - Math.max(percentChange(lastData.getMsgRateIn() + lastData.getMsgRateOut(), - localData.getMsgRateIn() + localData.getMsgRateOut()), - Math.max( - percentChange(lastData.getMsgThroughputIn() + lastData.getMsgThroughputOut(), - localData.getMsgThroughputIn() + localData.getMsgThroughputOut()), - percentChange(lastData.getNumBundles(), localData.getNumBundles())))); + final double maxChange = LocalBrokerData.max( + percentChange(lastData.getMsgRateIn() + lastData.getMsgRateOut(), + localData.getMsgRateIn() + localData.getMsgRateOut()), + percentChange(lastData.getMsgThroughputIn() + lastData.getMsgThroughputOut(), + localData.getMsgThroughputIn() + localData.getMsgThroughputOut()), + percentChange(lastData.getNumBundles(), localData.getNumBundles()), + 100.0 * (Math.abs(getMaxResourceUsageWithWeight(lastData, conf) - getMaxResourceUsageWithWeight(localData, conf)))); if (maxChange > conf.getLoadBalancerReportUpdateThresholdPercentage()) { log.info("Writing local data to metadata store because maximum change {}% exceeded threshold {}%; " + "time since last report written is {} seconds", maxChange, @@ -932,7 +939,8 @@ Optional selectBroker(final ServiceUnitId serviceUnit) { } final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0; - final double maxUsage = loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage(); + final double maxUsage = getMaxResourceUsageWithWeight( + loadData.getBrokerData().get(broker.get()).getLocalData(), conf); if (maxUsage > overloadThreshold) { // All brokers that were in the filtered list were overloaded, so check if there is a better broker LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java index a4eb5077224ce..fb31548227b31 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java @@ -36,7 +36,8 @@ /** * Load shedding strategy which will attempt to shed exactly one bundle on brokers which are overloaded, that is, whose * maximum system resource usage exceeds loadBalancerBrokerOverloadedThresholdPercentage. To see which resources are - * considered when determining the maximum system resource, see {@link LocalBrokerData#getMaxResourceUsage()}. A bundle + * considered when determining the maximum system resource, see + * {@link LocalBrokerData#getMaxResourceUsageWithWeight(double, double, double, double)}. A bundle * is recommended for unloading off that broker if and only if the following conditions hold: The broker has at * least two bundles assigned and the broker has at least one bundle that has not been unloaded recently according to * LoadBalancerSheddingGracePeriodMinutes. The unloaded bundle will be the most expensive bundle in terms of message @@ -71,7 +72,11 @@ public Multimap findBundlesForUnloading(final LoadData loadData, loadData.getBrokerData().forEach((broker, brokerData) -> { final LocalBrokerData localData = brokerData.getLocalData(); - final double currentUsage = localData.getMaxResourceUsage(); + final double currentUsage = localData.getMaxResourceUsageWithWeight( + conf.getLoadBalancerCPUResourceWeight(), + conf.getLoadBalancerDirectMemoryResourceWeight(), + conf.getLoadBalancerBandwidthInResourceWeight(), + conf.getLoadBalancerBandwidthOutResourceWeight()); if (currentUsage < overloadThreshold) { if (log.isDebugEnabled()) { log.debug("[{}] Broker is not overloaded, ignoring at this point ({})", broker,