Skip to content

Commit 0b85cba

Browse files
committed
Revert "Updated the topN selection logic for reduce"
This reverts commit a711100.
1 parent 3e02f1a commit 0b85cba

File tree

1 file changed

+31
-16
lines changed

1 file changed

+31
-16
lines changed

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -458,15 +458,21 @@ For backward compatibility, we disable the merge sort and use ({@link InternalTe
458458
}
459459
final B[] list;
460460
if (reduceContext.isFinalReduce() || reduceContext.isSliceLevel()) {
461-
B[] reducedBucketsArr = createBucketsArray(reducedBuckets.size());
462-
for (int i = 0; i < reducedBuckets.size(); i++) {
463-
reducedBucketsArr[i] = reducedBuckets.get(i);
464-
}
465461
final int size = Math.min(localBucketCountThresholds.getRequiredSize(), reducedBuckets.size());
466-
final Comparator<MultiBucketsAggregation.Bucket> cmp = order.comparator();
467462
if (size < reducedBuckets.size()) {
468-
ArrayUtil.select(reducedBucketsArr, 0, reducedBuckets.size(), size, cmp);
469-
int selectedSize = 0;
463+
Comparator<MultiBucketsAggregation.Bucket> cmp = order.comparator();
464+
B[] reducedBucketsArr = createBucketsArray(reducedBuckets.size());;
465+
for (int i = 0; i < reducedBuckets.size(); i++) {
466+
reducedBucketsArr[i] = reducedBuckets.get(i);
467+
}
468+
ArrayUtil.select(
469+
reducedBucketsArr,
470+
0,
471+
reducedBuckets.size(),
472+
size,
473+
cmp
474+
);
475+
int sz = 0;
470476
for (B bucket : reducedBucketsArr) {
471477
if (sumDocCountError == -1) {
472478
bucket.setDocCountError(-1);
@@ -475,38 +481,47 @@ For backward compatibility, we disable the merge sort and use ({@link InternalTe
475481
bucket.setDocCountError(docCountError -> docCountError + finalSumDocCountError);
476482
}
477483
if (bucket.getDocCount() >= localBucketCountThresholds.getMinDocCount()) {
478-
B removed = ((selectedSize == size) ? bucket : null);
484+
B removed = ((sz == size) ? bucket : null);
479485
if (removed != null) {
480486
otherDocCount += removed.getDocCount();
481487
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
482488
} else {
483-
selectedSize++;
489+
sz++;
484490
reduceContext.consumeBucketsAndMaybeBreak(1);
485491
}
486492
} else {
487493
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket));
488494
}
489495
}
490-
list = createBucketsArray(selectedSize);
491-
System.arraycopy(reducedBucketsArr, 0, list, 0, selectedSize);
496+
list = createBucketsArray(sz);
497+
if (sz >= 0) System.arraycopy(reducedBucketsArr, 0, list, 0, sz);
498+
Arrays.sort(list, cmp);
492499
} else {
493-
// since only else case possible is size == reducedBuckets.size() we can use the entire list of reduced buckets
494-
list = reducedBucketsArr;
495-
for (B bucket : reducedBucketsArr) {
500+
final BucketPriorityQueue<B> ordered = new BucketPriorityQueue<>(size, order.comparator());
501+
for (B bucket : reducedBuckets) {
496502
if (sumDocCountError == -1) {
497503
bucket.setDocCountError(-1);
498504
} else {
499505
final long finalSumDocCountError = sumDocCountError;
500506
bucket.setDocCountError(docCountError -> docCountError + finalSumDocCountError);
501507
}
502508
if (bucket.getDocCount() >= localBucketCountThresholds.getMinDocCount()) {
503-
reduceContext.consumeBucketsAndMaybeBreak(1);
509+
B removed = ordered.insertWithOverflow(bucket);
510+
if (removed != null) {
511+
otherDocCount += removed.getDocCount();
512+
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
513+
} else {
514+
reduceContext.consumeBucketsAndMaybeBreak(1);
515+
}
504516
} else {
505517
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket));
506518
}
507519
}
520+
list = createBucketsArray(ordered.size());
521+
for (int i = ordered.size() - 1; i >= 0; i--) {
522+
list[i] = ordered.pop();
523+
}
508524
}
509-
Arrays.sort(list, cmp);
510525
} else {
511526
// we can prune the list on partial reduce if the aggregation is ordered by key
512527
// and not filtered (minDocCount == 0)

0 commit comments

Comments
 (0)