diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index f8aa60cf98f..5a9ca3108fd 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -41,7 +41,7 @@ use tantivy::directory::FileSlice; use tantivy::fastfield::FastFieldReaders; use tantivy::schema::Field; use tantivy::{DateTime, Index, ReloadPolicy, Searcher, TantivyError, Term}; -use tokio::task::JoinError; +use tokio::task::{JoinError, JoinSet}; use tracing::*; use crate::collector::{IncrementalCollector, make_collector_for_split, make_merge_collector}; @@ -1202,8 +1202,7 @@ pub async fn multi_index_leaf_search( // // It is a little bit tricky how to handle which is now the incremental_merge_collector, one // per index, e.g. when to merge results and how to avoid lock contention. - let mut leaf_request_tasks = Vec::new(); - + let mut join_set = JoinSet::new(); for leaf_search_request_ref in leaf_search_request.leaf_requests.into_iter() { let index_uri = quickwit_common::uri::Uri::from_str( leaf_search_request @@ -1226,7 +1225,7 @@ pub async fn multi_index_leaf_search( })? .clone(); - let leaf_request_future = tokio::spawn({ + join_set.spawn({ let storage_resolver = storage_resolver.clone(); let searcher_context = searcher_context.clone(); let search_request = search_request.clone(); @@ -1241,33 +1240,16 @@ pub async fn multi_index_leaf_search( doc_mapper, aggregation_limits, ) + .in_current_span() .await } - .in_current_span() }); - leaf_request_tasks.push(leaf_request_future); } - let leaf_responses: Vec> = tokio::time::timeout( - searcher_context.searcher_config.request_timeout(), - try_join_all(leaf_request_tasks), - ) - .await??; let merge_collector = make_merge_collector(&search_request, aggregation_limits)?; let mut incremental_merge_collector = IncrementalCollector::new(merge_collector); - for result in leaf_responses { - match result { - Ok(result) => { - incremental_merge_collector.add_result(result)?; - } - Err(err) => { - incremental_merge_collector.add_failed_split(SplitSearchError { - split_id: "unknown".to_string(), - error: format!("{err}"), - retryable_error: true, - }); - } - } + while let Some(result) = join_set.join_next().await { + incremental_merge_collector.add_result(result??)?; } crate::search_thread_pool() @@ -1349,9 +1331,6 @@ pub async fn single_doc_mapping_leaf_search( let split_filter = Arc::new(RwLock::new(split_filter)); - let mut leaf_search_single_split_join_handles: Vec<(String, tokio::task::JoinHandle<()>)> = - Vec::with_capacity(split_with_req.len()); - let merge_collector = make_merge_collector(&request, aggregations_limits.clone())?; let incremental_merge_collector = IncrementalCollector::new(merge_collector); let incremental_merge_collector = Arc::new(Mutex::new(incremental_merge_collector)); @@ -1379,6 +1358,8 @@ pub async fn single_doc_mapping_leaf_search( split_filter: split_filter.clone(), }); + let mut join_set = JoinSet::new(); + let mut split_with_task_id = Vec::with_capacity(split_with_req.len()); for ((split, search_request), permit_fut) in split_with_req.into_iter().zip(permit_futures.into_iter()) { @@ -1394,35 +1375,37 @@ pub async fn single_doc_mapping_leaf_search( leaf_search_state_guard.set_state(SplitSearchState::PrunedBeforeWarmup); continue; }; - - leaf_search_single_split_join_handles.push(( - split.split_id.clone(), - tokio::spawn( - leaf_search_single_split_wrapper( - simplified_search_request, - leaf_search_context.clone(), - index_storage.clone(), - split, - leaf_split_search_permit, - aggregations_limits.clone(), - ) - .in_current_span(), - ), - )); + let split_id = split.split_id.clone(); + let handle = join_set.spawn( + leaf_search_single_split_wrapper( + simplified_search_request, + leaf_search_context.clone(), + index_storage.clone(), + split, + leaf_split_search_permit, + aggregations_limits.clone(), + ) + .in_current_span(), + ); + split_with_task_id.push((split_id, handle.id())); } // TODO we could cancel running splits when !run_all_splits and the running split can no // longer give better results after some other split answered. let mut split_search_join_errors: Vec<(String, JoinError)> = Vec::new(); - // There is no need to use `join_all`, as these are spawned tasks. - for (split, leaf_search_join_handle) in leaf_search_single_split_join_handles { + while let Some(leaf_search_join_result) = join_set.join_next().await { // splits that did not panic were already added to the collector - if let Err(join_error) = leaf_search_join_handle.await { + if let Err(join_error) = leaf_search_join_result { if join_error.is_cancelled() { // An explicit task cancellation is not an error. continue; } + let position = split_with_task_id + .iter() + .position(|(_, task_id)| *task_id == join_error.id()) + .unwrap(); + let (split, _) = split_with_task_id.remove(position); if join_error.is_panic() { error!(split=%split, "leaf search task panicked"); } else { diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 88dba389c83..3a8088cd61a 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -186,7 +186,7 @@ impl SearchService for SearchServiceImpl { .map(|req| req.split_offsets.len()) .sum::(); - LeafSearchMetricsFuture { + let tracked_future = LeafSearchMetricsFuture { tracked: multi_index_leaf_search( self.searcher_context.clone(), leaf_search_request, @@ -195,8 +195,9 @@ impl SearchService for SearchServiceImpl { start: Instant::now(), targeted_splits: num_splits, status: None, - } - .await + }; + let timeout = self.searcher_context.searcher_config.request_timeout(); + tokio::time::timeout(timeout, tracked_future).await? } async fn fetch_docs(