Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-364: Introduce a new load balance algorithm AvgShedder #22949

Merged
merged 31 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
8f79af0
add AvgShedder.
thetumbled Jun 20, 2024
be38d4d
fix typo.
thetumbled Jun 20, 2024
a153eb0
Merge remote-tracking branch 'apache/master' into AvgShedder
thetumbled Jun 27, 2024
7944159
replace bundle name + random number with random number only.
thetumbled Jun 27, 2024
4cf4f2b
add test code.
thetumbled Jun 28, 2024
99caa97
add test code.
thetumbled Jun 28, 2024
b5e444b
update conf.
thetumbled Jun 28, 2024
6d4678a
fix typo.
thetumbled Jun 28, 2024
59907a8
add test code.
thetumbled Jun 28, 2024
0ac2b99
fix checkstyle.
thetumbled Jun 28, 2024
1bf6346
fix header.
thetumbled Jun 28, 2024
d2dbf6c
fix checkstyle.
thetumbled Jun 28, 2024
1b60887
fix checkstyle.
thetumbled Jun 28, 2024
c2f6123
Update pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalan…
thetumbled Jul 5, 2024
00e5625
Update pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalan…
thetumbled Jul 5, 2024
1c5afe9
Update pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalan…
thetumbled Jul 5, 2024
4f4ae3e
improve.
thetumbled Jul 5, 2024
64de192
fix check style.
thetumbled Jul 5, 2024
bec01e6
throw exception.
thetumbled Jul 5, 2024
e438be4
fix checkstyle.
thetumbled Jul 6, 2024
4cfbbda
Update pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalan…
thetumbled Jul 8, 2024
5eac740
fix deprecated conf.
thetumbled Jul 8, 2024
eca4027
improve log.
thetumbled Jul 8, 2024
2fddb77
refactor to some sub methods.
thetumbled Jul 8, 2024
f731a25
refactor map.
thetumbled Jul 8, 2024
11d56c8
fix check style.
thetumbled Jul 10, 2024
0fc8bc5
avoid create list manually.
thetumbled Jul 11, 2024
65b8941
reduce one find operation.
thetumbled Jul 11, 2024
dfb8765
add test to cover getExpectedBroker.
thetumbled Jul 11, 2024
4a0f64c
avoid static Random object .
thetumbled Jul 11, 2024
26f5479
make selectedBundlesCache locally.
thetumbled Jul 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1414,6 +1414,25 @@ loadBalancerBundleUnloadMinThroughputThreshold=10
# Time to wait for the unloading of a namespace bundle
namespaceBundleUnloadingTimeoutMs=60000

# configuration for AvgShedder, a new shedding and placement strategy
# The low threshold for the difference between the highest and lowest loaded brokers.
loadBalancerAvgShedderLowThreshold = 15

# The high threshold for the difference between the highest and lowest loaded brokers.
loadBalancerAvgShedderHighThreshold = 40

# The number of times the low threshold is triggered before the bundle is unloaded.
loadBalancerAvgShedderHitCountLowThreshold = 8

# The number of times the high threshold is triggered before the bundle is unloaded.
loadBalancerAvgShedderHitCountHighThreshold = 2

# In the UniformLoadShedder and AvgShedder strategy, the maximum unload ratio.
# For AvgShedder, recommend to set to 0.5, so that it will distribute the load evenly
# between the highest and lowest brokers.
maxUnloadPercentage = 0.2


### --- Load balancer extension --- ###

# Option to enable the debug mode for the load balancer logics.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2395,21 +2395,51 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "In the UniformLoadShedder strategy, the minimum message that triggers unload."
doc = "The low threshold for the difference between the highest and lowest loaded brokers."
)
private int loadBalancerAvgShedderLowThreshold = 15;

@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "The high threshold for the difference between the highest and lowest loaded brokers."
)
private int loadBalancerAvgShedderHighThreshold = 40;

@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "The number of times the low threshold is triggered before the bundle is unloaded."
)
private int loadBalancerAvgShedderHitCountLowThreshold = 8;

@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "The number of times the high threshold is triggered before the bundle is unloaded."
)
private int loadBalancerAvgShedderHitCountHighThreshold = 2;

@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "In the UniformLoadShedder and AvgShedder strategy, the minimum message that triggers unload."
)
private int minUnloadMessage = 1000;

@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "In the UniformLoadShedder strategy, the minimum throughput that triggers unload."
doc = "In the UniformLoadShedder and AvgShedder strategy, the minimum throughput that triggers unload."
)
private int minUnloadMessageThroughput = 1 * 1024 * 1024;

@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "In the UniformLoadShedder strategy, the maximum unload ratio."
doc = "In the UniformLoadShedder and AvgShedder strategy, the maximum unload ratio."
+ "For AvgShedder, recommend to set to 0.5, so that it will distribute the load "
+ "evenly between the highest and lowest brokers."
)
private double maxUnloadPercentage = 0.2;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.impl;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.hash.Hashing;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import org.apache.commons.lang3.mutable.MutableDouble;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
import org.apache.pulsar.policies.data.loadbalancer.BundleData;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AvgShedder implements LoadSheddingStrategy, ModularLoadManagerStrategy {
private static Logger log = LoggerFactory.getLogger(AvgShedder.class);
thetumbled marked this conversation as resolved.
Show resolved Hide resolved

// map bundle to broker.
private final Map<BundleData, String> bundleBrokerMap = new HashMap<>();
// map broker to Scores. scores:0-100
private final Map<String, Double> brokerScoreMap = new HashMap<>();
// map broker hit count for high threshold/low threshold
private final Map<String, Integer> brokerHitCountForHigh = new HashMap<>();
private final Map<String, Integer> brokerHitCountForLow = new HashMap<>();

// result returned by shedding, map broker to bundles.
private final Multimap<String, String> selectedBundlesCache = ArrayListMultimap.create();

private static final double MB = 1024 * 1024;
private static final Random random = new Random();

public AvgShedder() {
}

thetumbled marked this conversation as resolved.
Show resolved Hide resolved
@Override
public Multimap<String, String> findBundlesForUnloading(LoadData loadData, ServiceConfiguration conf) {
selectedBundlesCache.clear();
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
final double minThroughputThreshold = conf.getMinUnloadMessageThroughput();
final double minMsgThreshold = conf.getMinUnloadMessage();
final double maxUnloadPercentage = conf.getMaxUnloadPercentage();

final Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles();
final double lowThreshold = conf.getLoadBalancerAvgShedderLowThreshold();
final double highThreshold = conf.getLoadBalancerAvgShedderHighThreshold();
final int hitCountHighThreshold = conf.getLoadBalancerAvgShedderHitCountHighThreshold();
final int hitCountLowThreshold = conf.getLoadBalancerAvgShedderHitCountLowThreshold();
if (log.isDebugEnabled()) {
log.debug("highThreshold:{}, lowThreshold:{}, hitCountHighThreshold:{}, hitCountLowThreshold:{}, "
+ "minMsgThreshold:{}, minThroughputThreshold:{}",
highThreshold, lowThreshold, hitCountHighThreshold, hitCountLowThreshold,
minMsgThreshold, minThroughputThreshold);
}

List<String> brokers = new LinkedList<>();
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
brokerScoreMap.clear();
// calculate scores of brokers.
for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
LocalBrokerData localBrokerData = entry.getValue().getLocalData();
String broker = entry.getKey();
brokers.add(broker);
Double score = calculateScores(broker, localBrokerData, conf);
brokerScoreMap.put(broker, score);
// collect data to analyze the relations between scores and throughput and messageRate.
log.info("broker:{}, scores:{}, throughput:{}, messageRate:{}", broker, score,
localBrokerData.getMsgThroughputIn() + localBrokerData.getMsgThroughputOut(),
localBrokerData.getMsgRateIn() + localBrokerData.getMsgRateOut());
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
}

// sort brokers by scores.
Collections.sort(brokers, (e1, e2) -> (int) (brokerScoreMap.get(e1) - brokerScoreMap.get(e2)));
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
if (log.isDebugEnabled()) {
log.debug("sorted broker list:{}", brokers);
}

// find broker pairs for shedding.
List<Pair<String, String>> pairs = new LinkedList<>();
int i = 0, j = brokers.size() - 1;
while (i <= j) {
String maxBroker = brokers.get(j);
String minBroker = brokers.get(i);
if (brokerScoreMap.get(maxBroker) - brokerScoreMap.get(minBroker) < lowThreshold) {
brokerHitCountForHigh.remove(maxBroker);
brokerHitCountForHigh.remove(minBroker);

brokerHitCountForLow.remove(maxBroker);
brokerHitCountForLow.remove(minBroker);
} else {
pairs.add(Pair.of(minBroker, maxBroker));
if (brokerScoreMap.get(maxBroker) - brokerScoreMap.get(minBroker) < highThreshold) {
brokerHitCountForLow.put(minBroker, brokerHitCountForLow.getOrDefault(minBroker, 0) + 1);
brokerHitCountForLow.put(maxBroker, brokerHitCountForLow.getOrDefault(maxBroker, 0) + 1);

brokerHitCountForHigh.remove(maxBroker);
brokerHitCountForHigh.remove(minBroker);
} else {
brokerHitCountForLow.put(minBroker, brokerHitCountForLow.getOrDefault(minBroker, 0) + 1);
brokerHitCountForLow.put(maxBroker, brokerHitCountForLow.getOrDefault(maxBroker, 0) + 1);

brokerHitCountForHigh.put(minBroker, brokerHitCountForHigh.getOrDefault(minBroker, 0) + 1);
brokerHitCountForHigh.put(maxBroker, brokerHitCountForHigh.getOrDefault(maxBroker, 0) + 1);
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
}
}
i++;
j--;
}

log.info("brokerHitCountForHigh:{}, brokerHitCountForLow:{}",
brokerHitCountForHigh, brokerHitCountForLow);

if (pairs.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("there is no any overload broker.no need to shedding bundles.");
}
brokerHitCountForHigh.clear();
brokerHitCountForLow.clear();
return selectedBundlesCache;
}

// choosing bundles to unload.
for (Pair<String, String> pair : pairs) {
String overloadedBroker = pair.getRight();
String underloadedBroker = pair.getLeft();

// check hit count for high threshold and low threshold.
if (!(brokerHitCountForHigh.getOrDefault(underloadedBroker, 0) >= hitCountHighThreshold)
&& !(brokerHitCountForHigh.getOrDefault(overloadedBroker, 0) >= hitCountHighThreshold)
&& !(brokerHitCountForLow.getOrDefault(underloadedBroker, 0) >= hitCountLowThreshold)
&& !(brokerHitCountForLow.getOrDefault(overloadedBroker, 0) >= hitCountLowThreshold)) {
continue;
}

// if hit, remove entry.
brokerHitCountForHigh.remove(underloadedBroker);
brokerHitCountForHigh.remove(overloadedBroker);

brokerHitCountForLow.remove(underloadedBroker);
brokerHitCountForLow.remove(overloadedBroker);

// calculate how much throughput to unload.
LocalBrokerData minLocalBrokerData = loadData.getBrokerData().get(underloadedBroker).getLocalData();
LocalBrokerData maxLocalBrokerData = loadData.getBrokerData().get(overloadedBroker).getLocalData();

double minMsgRate = minLocalBrokerData.getMsgRateIn() + minLocalBrokerData.getMsgRateOut();
double maxMsgRate = maxLocalBrokerData.getMsgRateIn() + maxLocalBrokerData.getMsgRateOut();

double minThroughput = minLocalBrokerData.getMsgThroughputIn() + minLocalBrokerData.getMsgThroughputOut();
double maxThroughput = maxLocalBrokerData.getMsgThroughputIn() + maxLocalBrokerData.getMsgThroughputOut();

double msgRequiredFromUnloadedBundles = (maxMsgRate - minMsgRate) * maxUnloadPercentage;
double throughputRequiredFromUnloadedBundles = (maxThroughput - minThroughput) * maxUnloadPercentage;

boolean isMsgRateToOffload;
MutableDouble trafficMarkedToOffload = new MutableDouble(0);

if (msgRequiredFromUnloadedBundles > minMsgThreshold) {
isMsgRateToOffload = true;
trafficMarkedToOffload.setValue(msgRequiredFromUnloadedBundles);
} else if (throughputRequiredFromUnloadedBundles > minThroughputThreshold) {
isMsgRateToOffload = false;
trafficMarkedToOffload.setValue(throughputRequiredFromUnloadedBundles);
} else {
log.info(
"broker:[{}] is planning to shed bundles to broker:[{}],but the throughput {} MByte/s is "
+ "less than minimumThroughputThreshold {} MByte/s, and the msgRate {} rate/s"
+ " is also less than minimumMsgRateThreshold {} rate/s, skipping bundle unload.",
overloadedBroker, underloadedBroker, throughputRequiredFromUnloadedBundles / MB,
minThroughputThreshold / MB, msgRequiredFromUnloadedBundles, minMsgThreshold);
continue;
}

if (maxLocalBrokerData.getBundles().size() == 1) {
log.warn("HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. "
+ "No Load Shedding will be done on this broker",
maxLocalBrokerData.getBundles().iterator().next(), pair.getRight());
} else if (maxLocalBrokerData.getBundles().size() == 0) {
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
log.warn("Broker {} is overloaded despite having no bundles", pair.getRight());
}

// do shedding
log.info(
"broker:[{}] is planning to shed bundles to broker:[{}]. "
+ "maxBroker stat:scores:{}, throughput:{}, msgRate:{}. "
+ "minBroker stat:scores:{}, throughput:{}, msgRate:{}. "
+ "isMsgRateToOffload:{}, trafficMarkedToOffload:{}",
overloadedBroker, underloadedBroker, brokerScoreMap.get(overloadedBroker), maxThroughput,
maxMsgRate, brokerScoreMap.get(underloadedBroker), minThroughput, minMsgRate,
isMsgRateToOffload, trafficMarkedToOffload);

loadData.getBundleDataForLoadShedding().entrySet().stream().filter(e ->
maxLocalBrokerData.getBundles().contains(e.getKey())
).filter(e ->
!recentlyUnloadedBundles.containsKey(e.getKey())
).map((e) -> {
BundleData bundleData = e.getValue();
TimeAverageMessageData shortTermData = bundleData.getShortTermData();
double traffic = isMsgRateToOffload
? shortTermData.getMsgRateIn() + shortTermData.getMsgRateOut()
: shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut();
return Pair.of(e, traffic);
}).sorted((e1, e2) ->
Double.compare(e2.getRight(), e1.getRight())
).forEach(e -> {
Map.Entry<String, BundleData> bundle = e.getLeft();
double traffic = e.getRight();
if (traffic > 0 && traffic <= trafficMarkedToOffload.getValue()) {
selectedBundlesCache.put(overloadedBroker, bundle.getKey());
bundleBrokerMap.put(bundle.getValue(), underloadedBroker);
trafficMarkedToOffload.add(-traffic);
if (log.isDebugEnabled()) {
log.debug("Found bundle to unload:{}, isMsgRateToOffload:{}, traffic:{}",
bundle, isMsgRateToOffload, traffic);
}
}
});
}
return selectedBundlesCache;
}

@Override
public void onActiveBrokersChange(Set<String> activeBrokers) {
LoadSheddingStrategy.super.onActiveBrokersChange(activeBrokers);
}

private Double calculateScores(String broker, LocalBrokerData localBrokerData, final ServiceConfiguration conf) {
double score = localBrokerData.getMaxResourceUsageWithWeight(
conf.getLoadBalancerCPUResourceWeight(),
conf.getLoadBalancerDirectMemoryResourceWeight(),
conf.getLoadBalancerBandwithInResourceWeight(),
conf.getLoadBalancerBandwithOutResourceWeight()) * 100;
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
return score;
}
thetumbled marked this conversation as resolved.
Show resolved Hide resolved

@Override
public Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
ServiceConfiguration conf) {
if (!bundleBrokerMap.containsKey(bundleToAssign) || !candidates.contains(bundleBrokerMap.get(bundleToAssign))) {
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
// cluster initializing or broker is shutdown
if (log.isDebugEnabled()) {
if (!bundleBrokerMap.containsKey(bundleToAssign)) {
log.debug("cluster is initializing");
} else {
log.debug("expected broker:{} is shutdown, candidates:{}", bundleBrokerMap.get(bundleToAssign),
candidates);
}
}
String broker = getExpectedBroker(candidates, bundleToAssign);
bundleBrokerMap.put(bundleToAssign, broker);
return Optional.of(broker);
} else {
return Optional.of(bundleBrokerMap.get(bundleToAssign));
}
}

private static long hash(String key) {
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
return Hashing.crc32().hashString(key, StandardCharsets.UTF_8).padToLong();
}

private static String getExpectedBroker(Collection<String> brokers, BundleData bundle) {
List<String> sortedBrokers = new ArrayList<>(brokers);
Collections.sort(sortedBrokers);

try {
// use random number as input of hashing function to avoid special case that,
// if there is 4 brokers running in the cluster,and add broker5,and shutdown broker3,
// then all bundles belonging to broker3 will be loaded on the same broker.
final long hashcode = hash(String.valueOf(random.nextInt()));
final int index = (int) (Math.abs(hashcode) % sortedBrokers.size());
if (log.isDebugEnabled()) {
log.debug("Assignment details: brokers={}, bundle={}, hashcode={}, index={}",
sortedBrokers, bundle, hashcode, index);
}
return sortedBrokers.get(index);
} catch (Throwable e) {
// theoretically this logic branch should not be executed
log.error("Bundle format of {} is invalid", bundle, e);
return sortedBrokers.get(Math.abs(bundle.hashCode()) % sortedBrokers.size());
}
}
}
Loading
Loading