Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 28 additions & 45 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use tantivy::directory::FileSlice;
use tantivy::fastfield::FastFieldReaders;
use tantivy::schema::Field;
use tantivy::{DateTime, Index, ReloadPolicy, Searcher, TantivyError, Term};
use tokio::task::JoinError;
use tokio::task::{JoinError, JoinSet};
use tracing::*;

use crate::collector::{IncrementalCollector, make_collector_for_split, make_merge_collector};
Expand Down Expand Up @@ -1202,8 +1202,7 @@ pub async fn multi_index_leaf_search(
//
// It is a little bit tricky how to handle which is now the incremental_merge_collector, one
// per index, e.g. when to merge results and how to avoid lock contention.
let mut leaf_request_tasks = Vec::new();

let mut join_set = JoinSet::new();
for leaf_search_request_ref in leaf_search_request.leaf_requests.into_iter() {
let index_uri = quickwit_common::uri::Uri::from_str(
leaf_search_request
Expand All @@ -1226,7 +1225,7 @@ pub async fn multi_index_leaf_search(
})?
.clone();

let leaf_request_future = tokio::spawn({
join_set.spawn({
let storage_resolver = storage_resolver.clone();
let searcher_context = searcher_context.clone();
let search_request = search_request.clone();
Expand All @@ -1241,33 +1240,16 @@ pub async fn multi_index_leaf_search(
doc_mapper,
aggregation_limits,
)
.in_current_span()
.await
}
.in_current_span()
});
leaf_request_tasks.push(leaf_request_future);
}

let leaf_responses: Vec<crate::Result<LeafSearchResponse>> = tokio::time::timeout(
searcher_context.searcher_config.request_timeout(),
try_join_all(leaf_request_tasks),
)
.await??;
let merge_collector = make_merge_collector(&search_request, aggregation_limits)?;
let mut incremental_merge_collector = IncrementalCollector::new(merge_collector);
for result in leaf_responses {
match result {
Ok(result) => {
incremental_merge_collector.add_result(result)?;
}
Err(err) => {
incremental_merge_collector.add_failed_split(SplitSearchError {
split_id: "unknown".to_string(),
error: format!("{err}"),
retryable_error: true,
});
}
}
while let Some(result) = join_set.join_next().await {
incremental_merge_collector.add_result(result??)?;
}

crate::search_thread_pool()
Expand Down Expand Up @@ -1349,9 +1331,6 @@ pub async fn single_doc_mapping_leaf_search(

let split_filter = Arc::new(RwLock::new(split_filter));

let mut leaf_search_single_split_join_handles: Vec<(String, tokio::task::JoinHandle<()>)> =
Vec::with_capacity(split_with_req.len());

let merge_collector = make_merge_collector(&request, aggregations_limits.clone())?;
let incremental_merge_collector = IncrementalCollector::new(merge_collector);
let incremental_merge_collector = Arc::new(Mutex::new(incremental_merge_collector));
Expand Down Expand Up @@ -1379,6 +1358,8 @@ pub async fn single_doc_mapping_leaf_search(
split_filter: split_filter.clone(),
});

let mut join_set = JoinSet::new();
let mut split_with_task_id = Vec::with_capacity(split_with_req.len());
for ((split, search_request), permit_fut) in
split_with_req.into_iter().zip(permit_futures.into_iter())
{
Expand All @@ -1394,35 +1375,37 @@ pub async fn single_doc_mapping_leaf_search(
leaf_search_state_guard.set_state(SplitSearchState::PrunedBeforeWarmup);
continue;
};

leaf_search_single_split_join_handles.push((
split.split_id.clone(),
tokio::spawn(
leaf_search_single_split_wrapper(
simplified_search_request,
leaf_search_context.clone(),
index_storage.clone(),
split,
leaf_split_search_permit,
aggregations_limits.clone(),
)
.in_current_span(),
),
));
let split_id = split.split_id.clone();
let handle = join_set.spawn(
leaf_search_single_split_wrapper(
simplified_search_request,
leaf_search_context.clone(),
index_storage.clone(),
split,
leaf_split_search_permit,
aggregations_limits.clone(),
)
.in_current_span(),
);
split_with_task_id.push((split_id, handle.id()));
}

// TODO we could cancel running splits when !run_all_splits and the running split can no
// longer give better results after some other split answered.
let mut split_search_join_errors: Vec<(String, JoinError)> = Vec::new();

// There is no need to use `join_all`, as these are spawned tasks.
for (split, leaf_search_join_handle) in leaf_search_single_split_join_handles {
while let Some(leaf_search_join_result) = join_set.join_next().await {
// splits that did not panic were already added to the collector
if let Err(join_error) = leaf_search_join_handle.await {
if let Err(join_error) = leaf_search_join_result {
if join_error.is_cancelled() {
// An explicit task cancellation is not an error.
continue;
}
let position = split_with_task_id
.iter()
.position(|(_, task_id)| *task_id == join_error.id())
.unwrap();
let (split, _) = split_with_task_id.remove(position);
if join_error.is_panic() {
error!(split=%split, "leaf search task panicked");
} else {
Expand Down
7 changes: 4 additions & 3 deletions quickwit/quickwit-search/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl SearchService for SearchServiceImpl {
.map(|req| req.split_offsets.len())
.sum::<usize>();

LeafSearchMetricsFuture {
let tracked_future = LeafSearchMetricsFuture {
tracked: multi_index_leaf_search(
self.searcher_context.clone(),
leaf_search_request,
Expand All @@ -195,8 +195,9 @@ impl SearchService for SearchServiceImpl {
start: Instant::now(),
targeted_splits: num_splits,
status: None,
}
.await
};
let timeout = self.searcher_context.searcher_config.request_timeout();
tokio::time::timeout(timeout, tracked_future).await?
}

async fn fetch_docs(
Expand Down