diff --git a/src/common/storage/src/runtime_layer.rs b/src/common/storage/src/runtime_layer.rs index 9f703afaee17f..2ce25bafeb78d 100644 --- a/src/common/storage/src/runtime_layer.rs +++ b/src/common/storage/src/runtime_layer.rs @@ -17,6 +17,7 @@ use std::fmt::Formatter; use std::sync::Arc; use databend_common_base::runtime::Runtime; +use databend_common_base::runtime::ThreadTracker; use databend_common_base::runtime::TrySpawn; use opendal::raw::oio; use opendal::raw::Access; @@ -183,13 +184,23 @@ impl LayeredAccess for RuntimeAccessor { pub struct RuntimeIO { inner: Option, runtime: Arc, + spawn_task_name: String, } impl RuntimeIO { fn new(inner: R, runtime: Arc) -> Self { + // pre-assemble spawn task name, to avoid calling format! in heavy read loop + let query_id = ThreadTracker::query_id(); + let spawn_task_name = if let Some(id) = query_id { + format!("Running query {} IO task", id) + } else { + String::from("Running IO task") + }; + Self { inner: Some(inner), runtime, + spawn_task_name, } } } @@ -200,10 +211,14 @@ impl oio::Read for RuntimeIO { let runtime = self.runtime.clone(); let (r, res) = runtime - .spawn(async move { - let res = r.read().await; - (r, res) - }) + .try_spawn( + async move { + let res = r.read().await; + (r, res) + }, + Some(self.spawn_task_name.clone()), + ) + .expect("spawn must success") .await .expect("join must success"); self.inner = Some(r); @@ -217,10 +232,14 @@ impl oio::Write for RuntimeIO { let runtime = self.runtime.clone(); let (r, res) = runtime - .spawn(async move { - let res = r.write(bs).await; - (r, res) - }) + .try_spawn( + async move { + let res = r.write(bs).await; + (r, res) + }, + Some(self.spawn_task_name.clone()), + ) + .expect("spawn must success") .await .expect("join must success"); self.inner = Some(r); @@ -232,10 +251,14 @@ impl oio::Write for RuntimeIO { let runtime = self.runtime.clone(); let (r, res) = runtime - .spawn(async move { - let res = r.close().await; - (r, res) - }) + .try_spawn( + async move { + let res = r.close().await; + (r, res) + }, + Some(self.spawn_task_name.clone()), + ) + .expect("spawn must success") .await .expect("join must success"); self.inner = Some(r); @@ -247,10 +270,14 @@ impl oio::Write for RuntimeIO { let runtime = self.runtime.clone(); let (r, res) = runtime - .spawn(async move { - let res = r.abort().await; - (r, res) - }) + .try_spawn( + async move { + let res = r.abort().await; + (r, res) + }, + Some(self.spawn_task_name.clone()), + ) + .expect("spawn must success") .await .expect("join must success"); self.inner = Some(r); @@ -264,10 +291,14 @@ impl oio::List for RuntimeIO { let runtime = self.runtime.clone(); let (r, res) = runtime - .spawn(async move { - let res = r.next().await; - (r, res) - }) + .try_spawn( + async move { + let res = r.next().await; + (r, res) + }, + Some(self.spawn_task_name.clone()), + ) + .expect("spawn must success") .await .expect("join must success"); self.inner = Some(r); @@ -285,10 +316,14 @@ impl oio::Delete for RuntimeIO { let runtime = self.runtime.clone(); let (r, res) = runtime - .spawn(async move { - let res = r.flush().await; - (r, res) - }) + .try_spawn( + async move { + let res = r.flush().await; + (r, res) + }, + Some(self.spawn_task_name.clone()), + ) + .expect("spawn must success") .await .expect("join must success"); self.inner = Some(r);