-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Search before reporting
- I searched in the issues and found nothing similar.
Read release policy
- I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
User environment
Pulsar version : 3.0.12 + cherry-picked - #22765
NOTE: This bug is there in master as well.
Issue Description
The topK bundle selection fails due to the sorting failure in the partition sort algo being used here.
Partition sort uses, NamespaceBundleStats' + BundleData's custom ccompararable implementations where it checks throughput, connections, cache size etc against the defined threshold and when values are within the defined thresholds, they are considered "equal" (return 0), but this creates transitivity violations.
It violates the transitivity property required by Java's Comparable interface, causing Collections.sort() to potentially throw IllegalArgumentException with "Comparison method violates its general contract" error.
Error Log:
2025-09-12T22:29:58.832386492+05:30 16:59:58.832 [pulsar-load-manager-1-1] WARN org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask - Error write resource quota - java.lang.IllegalArgumentException: Comparison method violates its general contract
Due to this failure, the job - writeBundleDataOnZooKeeper(leader broker writes bundle data aggregated from all brokers to metadata store) in modularLoadManager may fail link and can cause degradation in productions due to failure/inconsistencies in LB decisions
Error messages
Error Log:
`2025-09-12T22:29:58.832386492+05:30 16:59:58.832 [pulsar-load-manager-1-1] WARN org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask - Error write resource quota - java.lang.IllegalArgumentException: Comparison method violates its general contract`
Reproducing the issue
Unit test to validate the bug
@Test
public void testPartitionSortTransitivityIssue() {
Random rnd = new Random(0);
ArrayList<NamespaceBundleStats> stats = new ArrayList<>();
for (int i = 0; i < 1000; ++i) {
NamespaceBundleStats s = new NamespaceBundleStats();
s.msgThroughputIn = 4 * 75000 * rnd.nextDouble(); // Just above threshold (1e5)
s.msgThroughputOut = 75000000 - (4 * (75000 * rnd.nextDouble()));
s.msgRateIn = 4 * 75 * rnd.nextDouble();
s.msgRateOut = 75000 - (4 * 75 * rnd.nextDouble());
s.topics = i;
s.consumerCount = i;
s.producerCount = 4 * rnd.nextInt(375);
s.cacheSize = 75000000 - (rnd.nextInt(4 * 75000));
stats.add(s);
}
List<Map.Entry<String, ? extends Comparable>> bundleEntries = new ArrayList<>();
for (NamespaceBundleStats s : stats) {
bundleEntries.add(new HashMap.SimpleEntry<>("bundle-" + s.msgThroughputIn, s));
}
try {
TopKBundles.partitionSort(bundleEntries, 100);
fail("shouldve failed with Comparison method violation");
} catch (IllegalArgumentException e) {
// Verify the exception message contains the expected text
assertTrue(e.getMessage().contains("Comparison method violates its general contract") ||
e.getMessage().contains("transitivity") ||
e.getMessage().contains("comparison"),
"Expected IllegalArgumentException about comparison contract violation, got: " + e.getMessage());
}
Additional information
Problematic Code:
public int compareByBandwidthIn(NamespaceBundleStats other) {
if (Math.abs(this.msgThroughputIn - other.msgThroughputIn) > throughputDifferenceThreshold) {
return Double.compare(this.msgThroughputIn, other.msgThroughputIn);
}
return 0; // ← This causes transitivity violations
}
Are you willing to submit a PR?
- I'm willing to submit a PR!