|
38 | 38 | import java.util.concurrent.Executors;
|
39 | 39 | import java.util.concurrent.RejectedExecutionException;
|
40 | 40 | import java.util.concurrent.TimeUnit;
|
| 41 | +import java.util.concurrent.atomic.AtomicBoolean; |
41 | 42 | import java.util.concurrent.atomic.AtomicReference;
|
42 | 43 | import java.util.concurrent.locks.Lock;
|
43 | 44 | import java.util.concurrent.locks.ReentrantLock;
|
@@ -625,6 +626,7 @@ public synchronized void doLoadShedding() {
|
625 | 626 | final Multimap<String, String> bundlesToUnload = loadSheddingStrategy.findBundlesForUnloading(loadData, conf);
|
626 | 627 |
|
627 | 628 | bundlesToUnload.asMap().forEach((broker, bundles) -> {
|
| 629 | + AtomicBoolean unloadBundleForBroker = new AtomicBoolean(false); |
628 | 630 | bundles.forEach(bundle -> {
|
629 | 631 | final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
|
630 | 632 | final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
|
@@ -654,24 +656,24 @@ public synchronized void doLoadShedding() {
|
654 | 656 | pulsar.getAdminClient().namespaces()
|
655 | 657 | .unloadNamespaceBundle(namespaceName, bundleRange, destBroker.get());
|
656 | 658 | loadData.getRecentlyUnloadedBundles().put(bundle, System.currentTimeMillis());
|
| 659 | + unloadBundleCount++; |
| 660 | + unloadBundleForBroker.set(true); |
657 | 661 | } catch (PulsarServerException | PulsarAdminException e) {
|
658 | 662 | log.warn("Error when trying to perform load shedding on {} for broker {}", bundle, broker, e);
|
659 | 663 | }
|
660 | 664 | });
|
| 665 | + if (unloadBundleForBroker.get()) { |
| 666 | + unloadBrokerCount++; |
| 667 | + } |
661 | 668 | });
|
662 | 669 |
|
663 |
| - updateBundleUnloadingMetrics(bundlesToUnload); |
| 670 | + updateBundleUnloadingMetrics(); |
664 | 671 | }
|
665 | 672 |
|
666 | 673 | /**
|
667 | 674 | * As leader broker, update bundle unloading metrics.
|
668 |
| - * |
669 |
| - * @param bundlesToUnload |
670 | 675 | */
|
671 |
| - private void updateBundleUnloadingMetrics(Multimap<String, String> bundlesToUnload) { |
672 |
| - unloadBrokerCount += bundlesToUnload.keySet().size(); |
673 |
| - unloadBundleCount += bundlesToUnload.values().size(); |
674 |
| - |
| 676 | + private void updateBundleUnloadingMetrics() { |
675 | 677 | List<Metrics> metrics = new ArrayList<>();
|
676 | 678 | Map<String, String> dimensions = new HashMap<>();
|
677 | 679 |
|
|
0 commit comments