Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 59 additions & 24 deletions src/common/storage/src/runtime_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::fmt::Formatter;
use std::sync::Arc;

use databend_common_base::runtime::Runtime;
use databend_common_base::runtime::ThreadTracker;
use databend_common_base::runtime::TrySpawn;
use opendal::raw::oio;
use opendal::raw::Access;
Expand Down Expand Up @@ -183,13 +184,23 @@ impl<A: Access> LayeredAccess for RuntimeAccessor<A> {
pub struct RuntimeIO<R: 'static> {
inner: Option<R>,
runtime: Arc<Runtime>,
spawn_task_name: String,
}

impl<R> RuntimeIO<R> {
fn new(inner: R, runtime: Arc<Runtime>) -> Self {
// pre-assemble spawn task name, to avoid calling format! in heavy read loop
let query_id = ThreadTracker::query_id();
let spawn_task_name = if let Some(id) = query_id {
format!("Running query {} IO task", id)
} else {
String::from("Running IO task")
};

Self {
inner: Some(inner),
runtime,
spawn_task_name,
}
}
}
Expand All @@ -200,10 +211,14 @@ impl<R: oio::Read> oio::Read for RuntimeIO<R> {
let runtime = self.runtime.clone();

let (r, res) = runtime
.spawn(async move {
let res = r.read().await;
(r, res)
})
.try_spawn(
async move {
let res = r.read().await;
(r, res)
},
Some(self.spawn_task_name.clone()),
)
.expect("spawn must success")
.await
.expect("join must success");
self.inner = Some(r);
Expand All @@ -217,10 +232,14 @@ impl<R: oio::Write> oio::Write for RuntimeIO<R> {
let runtime = self.runtime.clone();

let (r, res) = runtime
.spawn(async move {
let res = r.write(bs).await;
(r, res)
})
.try_spawn(
async move {
let res = r.write(bs).await;
(r, res)
},
Some(self.spawn_task_name.clone()),
)
.expect("spawn must success")
.await
.expect("join must success");
self.inner = Some(r);
Expand All @@ -232,10 +251,14 @@ impl<R: oio::Write> oio::Write for RuntimeIO<R> {
let runtime = self.runtime.clone();

let (r, res) = runtime
.spawn(async move {
let res = r.close().await;
(r, res)
})
.try_spawn(
async move {
let res = r.close().await;
(r, res)
},
Some(self.spawn_task_name.clone()),
)
.expect("spawn must success")
.await
.expect("join must success");
self.inner = Some(r);
Expand All @@ -247,10 +270,14 @@ impl<R: oio::Write> oio::Write for RuntimeIO<R> {
let runtime = self.runtime.clone();

let (r, res) = runtime
.spawn(async move {
let res = r.abort().await;
(r, res)
})
.try_spawn(
async move {
let res = r.abort().await;
(r, res)
},
Some(self.spawn_task_name.clone()),
)
.expect("spawn must success")
.await
.expect("join must success");
self.inner = Some(r);
Expand All @@ -264,10 +291,14 @@ impl<R: oio::List> oio::List for RuntimeIO<R> {
let runtime = self.runtime.clone();

let (r, res) = runtime
.spawn(async move {
let res = r.next().await;
(r, res)
})
.try_spawn(
async move {
let res = r.next().await;
(r, res)
},
Some(self.spawn_task_name.clone()),
)
.expect("spawn must success")
.await
.expect("join must success");
self.inner = Some(r);
Expand All @@ -285,10 +316,14 @@ impl<R: oio::Delete> oio::Delete for RuntimeIO<R> {
let runtime = self.runtime.clone();

let (r, res) = runtime
.spawn(async move {
let res = r.flush().await;
(r, res)
})
.try_spawn(
async move {
let res = r.flush().await;
(r, res)
},
Some(self.spawn_task_name.clone()),
)
.expect("spawn must success")
.await
.expect("join must success");
self.inner = Some(r);
Expand Down
Loading