Skip to content

Commit

Permalink
make resource work for other algo.
Browse files Browse the repository at this point in the history
  • Loading branch information
thetumbled committed Jun 11, 2024
1 parent f17d90e commit 8efa68e
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 22 deletions.
5 changes: 0 additions & 5 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1394,19 +1394,15 @@ loadBalancerMsgThroughputMultiplierDifferenceShedderThreshold=4
loadBalancerHistoryResourcePercentage=0.9

# The BandWidthIn usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerBandwidthInResourceWeight=1.0

# The BandWidthOut usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerBandwidthOutResourceWeight=1.0

# The CPU usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerCPUResourceWeight=1.0

# The direct memory usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
# Direct memory usage cannot accurately reflect the machine's load,
# and it is not recommended to use it to score the machine's load.
loadBalancerDirectMemoryResourceWeight=0
Expand Down Expand Up @@ -1814,7 +1810,6 @@ strictBookieAffinityEnabled=false
# These settings are left here for compatibility

# The heap memory usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
# Deprecated: Memory is no longer used as a load balancing item
loadBalancerMemoryResourceWeight=0

Expand Down
5 changes: 0 additions & 5 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -911,23 +911,18 @@ loadBalancerBrokerThresholdShedderPercentage=10
loadBalancerHistoryResourcePercentage=0.9

# The BandWidthIn usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerBandwidthInResourceWeight=1.0

# The BandWidthOut usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerBandwidthOutResourceWeight=1.0

# The CPU usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerCPUResourceWeight=1.0

# The heap memory usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerMemoryResourceWeight=0

# The direct memory usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerDirectMemoryResourceWeight=0

# Bundle unload minimum throughput threshold (MB), avoiding bundle unload frequently.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ public LeastLongTermMessageRate() {
// Any broker at (or above) the overload threshold will have a score of POSITIVE_INFINITY.
private static double getScore(final BrokerData brokerData, final ServiceConfiguration conf) {
final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
final double maxUsage = brokerData.getLocalData().getMaxResourceUsage();
final double maxUsage = brokerData.getLocalData().getMaxResourceUsageWithWeight(
conf.getLoadBalancerCPUResourceWeight(),
conf.getLoadBalancerDirectMemoryResourceWeight(),
conf.getLoadBalancerBandwidthInResourceWeight(),
conf.getLoadBalancerBandwidthOutResourceWeight());
if (maxUsage > overloadThreshold) {
log.warn("Broker {} is overloaded: max usage={}", brokerData.getLocalData().getWebServiceUrl(), maxUsage);
return Double.POSITIVE_INFINITY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,14 @@ private double percentChange(final double oldValue, final double newValue) {
return 100 * Math.abs((oldValue - newValue) / oldValue);
}

private double getMaxResourceUsageWithWeight(LocalBrokerData localBrokerData, ServiceConfiguration conf) {
return localBrokerData.getMaxResourceUsageWithWeight(
conf.getLoadBalancerCPUResourceWeight(),
conf.getLoadBalancerDirectMemoryResourceWeight(),
conf.getLoadBalancerBandwidthInResourceWeight(),
conf.getLoadBalancerBandwidthOutResourceWeight());
}

// Determine if the broker data requires an update by delegating to the update condition.
private boolean needBrokerDataUpdate() {
final long updateMaxIntervalMillis = TimeUnit.MINUTES
Expand All @@ -437,14 +445,13 @@ private boolean needBrokerDataUpdate() {
// Always update after surpassing the maximum interval.
return true;
}
final double maxChange = Math
.max(100.0 * (Math.abs(lastData.getMaxResourceUsage() - localData.getMaxResourceUsage())),
Math.max(percentChange(lastData.getMsgRateIn() + lastData.getMsgRateOut(),
localData.getMsgRateIn() + localData.getMsgRateOut()),
Math.max(
percentChange(lastData.getMsgThroughputIn() + lastData.getMsgThroughputOut(),
localData.getMsgThroughputIn() + localData.getMsgThroughputOut()),
percentChange(lastData.getNumBundles(), localData.getNumBundles()))));
final double maxChange = LocalBrokerData.max(
percentChange(lastData.getMsgRateIn() + lastData.getMsgRateOut(),
localData.getMsgRateIn() + localData.getMsgRateOut()),
percentChange(lastData.getMsgThroughputIn() + lastData.getMsgThroughputOut(),
localData.getMsgThroughputIn() + localData.getMsgThroughputOut()),
percentChange(lastData.getNumBundles(), localData.getNumBundles()),
100.0 * (Math.abs(getMaxResourceUsageWithWeight(lastData, conf) - getMaxResourceUsageWithWeight(localData, conf))));
if (maxChange > conf.getLoadBalancerReportUpdateThresholdPercentage()) {
log.info("Writing local data to metadata store because maximum change {}% exceeded threshold {}%; "
+ "time since last report written is {} seconds", maxChange,
Expand Down Expand Up @@ -932,7 +939,8 @@ Optional<String> selectBroker(final ServiceUnitId serviceUnit) {
}

final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
final double maxUsage = loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
final double maxUsage = getMaxResourceUsageWithWeight(
loadData.getBrokerData().get(broker.get()).getLocalData(), conf);
if (maxUsage > overloadThreshold) {
// All brokers that were in the filtered list were overloaded, so check if there is a better broker
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
/**
* Load shedding strategy which will attempt to shed exactly one bundle on brokers which are overloaded, that is, whose
* maximum system resource usage exceeds loadBalancerBrokerOverloadedThresholdPercentage. To see which resources are
* considered when determining the maximum system resource, see {@link LocalBrokerData#getMaxResourceUsage()}. A bundle
* considered when determining the maximum system resource, see
* {@link LocalBrokerData#getMaxResourceUsageWithWeight(double, double, double, double)}. A bundle
* is recommended for unloading off that broker if and only if the following conditions hold: The broker has at
* least two bundles assigned and the broker has at least one bundle that has not been unloaded recently according to
* LoadBalancerSheddingGracePeriodMinutes. The unloaded bundle will be the most expensive bundle in terms of message
Expand Down Expand Up @@ -71,7 +72,11 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
loadData.getBrokerData().forEach((broker, brokerData) -> {

final LocalBrokerData localData = brokerData.getLocalData();
final double currentUsage = localData.getMaxResourceUsage();
final double currentUsage = localData.getMaxResourceUsageWithWeight(
conf.getLoadBalancerCPUResourceWeight(),
conf.getLoadBalancerDirectMemoryResourceWeight(),
conf.getLoadBalancerBandwidthInResourceWeight(),
conf.getLoadBalancerBandwidthOutResourceWeight());
if (currentUsage < overloadThreshold) {
if (log.isDebugEnabled()) {
log.debug("[{}] Broker is not overloaded, ignoring at this point ({})", broker,
Expand Down

0 comments on commit 8efa68e

Please sign in to comment.