Skip to content

Commit f95bcae

Browse files
Add graceful handling of failures in QueryPhaseResultConsumer (#19231)
--------- Signed-off-by: Kaushal Kumar <[email protected]>
1 parent 3e747cb commit f95bcae

File tree

3 files changed

+26
-16
lines changed

3 files changed

+26
-16
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3535
- Fix assertion error when collapsing search results with concurrent segment search enabled ([#19053](https://github.com/opensearch-project/OpenSearch/pull/19053))
3636
- Fix skip_unavailable setting changing to default during node drop issue ([#18766](https://github.com/opensearch-project/OpenSearch/pull/18766))
3737
- Fix pull-based ingestion pause state initialization during replica promotion ([#19212](https://github.com/opensearch-project/OpenSearch/pull/19212))
38+
- Fix QueryPhaseResultConsumer incomplete callback loops ([#19231](https://github.com/opensearch-project/OpenSearch/pull/19231))
3839

3940
### Dependencies
4041
- Bump `com.netflix.nebula.ospackage-base` from 12.0.0 to 12.1.0 ([#19019](https://github.com/opensearch-project/OpenSearch/pull/19019))

server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,9 @@ private MergeResult partialReduce(
217217
int numReducePhases
218218
) {
219219
checkCancellation();
220+
if (pendingMerges.hasFailure()) {
221+
return lastMerge;
222+
}
220223
// ensure consistent ordering
221224
Arrays.sort(toConsume, Comparator.comparingInt(QuerySearchResult::getShardIndex));
222225

@@ -450,6 +453,10 @@ private synchronized void onMergeFailure(Exception exc) {
450453
MergeTask task = runningTask.get();
451454
runningTask.compareAndSet(task, null);
452455
onPartialMergeFailure.accept(exc);
456+
clearPendingMerges(task);
457+
}
458+
459+
void clearPendingMerges(MergeTask task) {
453460
List<MergeTask> toCancels = new ArrayList<>();
454461
if (task != null) {
455462
toCancels.add(task);
@@ -471,10 +478,11 @@ private void resetCircuitBreakerForCurrentRequest() {
471478

472479
private void onAfterMerge(MergeTask task, MergeResult newResult, long estimatedSize) {
473480
synchronized (this) {
481+
runningTask.compareAndSet(task, null);
474482
if (hasFailure()) {
483+
task.cancel();
475484
return;
476485
}
477-
runningTask.compareAndSet(task, null);
478486
mergeResult = newResult;
479487
if (hasAggs) {
480488
// Update the circuit breaker to remove the size of the source aggregations
@@ -495,7 +503,11 @@ private void onAfterMerge(MergeTask task, MergeResult newResult, long estimatedS
495503
private void tryExecuteNext() {
496504
final MergeTask task;
497505
synchronized (this) {
498-
if (queue.isEmpty() || hasFailure() || runningTask.get() != null) {
506+
if (hasFailure()) {
507+
clearPendingMerges(null);
508+
return;
509+
}
510+
if (queue.isEmpty() || runningTask.get() != null) {
499511
return;
500512
}
501513
task = queue.poll();
@@ -511,6 +523,7 @@ protected void doRun() {
511523
try {
512524
final QuerySearchResult[] toConsume = task.consumeBuffer();
513525
if (toConsume == null) {
526+
task.cancel();
514527
return;
515528
}
516529
long estimatedMergeSize = estimateRamBytesUsedForReduce(estimatedTotalSize);

server/src/test/java/org/opensearch/action/search/SearchPhaseControllerTests.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1747,8 +1747,11 @@ public void testCancellationWithoutCircuitBreaker() throws Exception {
17471747
int batchedReduceSize = randomIntBetween(2, expectedNumResults - 1);
17481748
SearchRequest request = getAggregationSearchRequestWithBatchedReduceSize(batchedReduceSize);
17491749
AssertingCircuitBreaker circuitBreaker = new AssertingCircuitBreaker(CircuitBreaker.REQUEST);
1750-
AtomicInteger checkCount = new AtomicInteger(0);
1751-
int cancelAfter = expectedNumResults / 2;
1750+
// To make it deterministic, we can count the number of times the partialReduce and reduce are called
1751+
// The exception is only thrown during the call to reduce which will happen once all shard level
1752+
// results have arrived
1753+
int partialReduceMethodCallCount = expectedNumResults / batchedReduceSize;
1754+
AtomicInteger checkCount = new AtomicInteger(expectedNumResults + partialReduceMethodCallCount);
17521755

17531756
QueryPhaseResultConsumer consumer = searchPhaseController.newSearchPhaseResults(
17541757
fixedExecutor,
@@ -1758,7 +1761,7 @@ public void testCancellationWithoutCircuitBreaker() throws Exception {
17581761
expectedNumResults,
17591762
exc -> {},
17601763
() -> {
1761-
return checkCount.incrementAndGet() > cancelAfter;
1764+
return checkCount.decrementAndGet() <= 0;
17621765
}
17631766
);
17641767

@@ -1775,9 +1778,8 @@ public void testCancellationDoesNotMaskCircuitBreakerException() throws Exceptio
17751778

17761779
// making sure circuit breaker trips first
17771780
circuitBreaker.shouldBreak.set(true);
1778-
AtomicInteger checkCount = new AtomicInteger(0);
1779-
int cancelAfter = expectedNumResults + 1;
1780-
1781+
int partialReduceMethodCallCount = expectedNumResults / batchedReduceSize;
1782+
AtomicInteger checkCount = new AtomicInteger(expectedNumResults + partialReduceMethodCallCount);
17811783
QueryPhaseResultConsumer consumer = searchPhaseController.newSearchPhaseResults(
17821784
fixedExecutor,
17831785
circuitBreaker,
@@ -1786,7 +1788,7 @@ public void testCancellationDoesNotMaskCircuitBreakerException() throws Exceptio
17861788
expectedNumResults,
17871789
exc -> {},
17881790
() -> {
1789-
return checkCount.incrementAndGet() > cancelAfter;
1791+
return checkCount.decrementAndGet() <= 0;
17901792
}
17911793
);
17921794

@@ -1826,13 +1828,7 @@ private static void consumeShardLevelQueryPhaseResultsAsync(int expectedNumResul
18261828
result.setShardIndex(index);
18271829
result.size(1);
18281830

1829-
try {
1830-
consumer.consumeResult(result, latch::countDown);
1831-
} catch (Exception e) {
1832-
// Ensure latch counts down even on cancellation
1833-
latch.countDown();
1834-
// Don't rethrow - let the thread complete normally
1835-
}
1831+
consumer.consumeResult(result, latch::countDown);
18361832
});
18371833
threads[index].start();
18381834
}

0 commit comments

Comments
 (0)