Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 51 additions & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,6 @@ ethnum = { version = "1.5.0" }
fallible-streaming-iterator = "0.1"
faststr = "0.2"
feature-set = { version = "0.1.1" }
flagset = "0.4"
flatbuffers = "24" # Must use the same version with arrow-ipc
flate2 = "1"
foreign_vec = "0.1.0"
Expand Down Expand Up @@ -363,10 +362,10 @@ num-derive = "0.3.3"
num-traits = "0.2.19"
num_cpus = "1.13.1"
object = "0.36.5"
object_store_opendal = "0.48.1"
object_store_opendal = { git = "https://github.com/apache/opendal", package = "object_store_opendal", rev = "f7f9990" }
once_cell = "1.15.0"
openai_api_rust = "0.1"
opendal = { version = "0.50.1", features = [
opendal = { version = "0.51", git = "https://github.com/apache/opendal", rev = "f7f9990", features = [
"layers-fastrace",
"layers-prometheus-client",
"layers-async-backtrace",
Expand Down
1 change: 0 additions & 1 deletion src/common/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ databend-common-meta-app = { workspace = true }
databend-common-metrics = { workspace = true }
databend-common-native = { workspace = true }
databend-enterprise-storage-encryption = { workspace = true }
flagset = { workspace = true }
futures = { workspace = true }
http = { workspace = true }
log = { workspace = true }
Expand Down
12 changes: 12 additions & 0 deletions src/common/storage/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use opendal::raw::LayeredAccess;
use opendal::raw::OpList;
use opendal::raw::OpRead;
use opendal::raw::OpWrite;
use opendal::raw::RpDelete;
use opendal::raw::RpList;
use opendal::raw::RpRead;
use opendal::raw::RpWrite;
Expand Down Expand Up @@ -167,6 +168,8 @@ impl<A: Access> LayeredAccess for StorageMetricsAccessor<A> {
type BlockingWriter = StorageMetricsWrapper<A::BlockingWriter>;
type Lister = A::Lister;
type BlockingLister = A::BlockingLister;
type Deleter = A::Deleter;
type BlockingDeleter = A::BlockingDeleter;

fn inner(&self) -> &Self::Inner {
&self.inner
Expand All @@ -193,6 +196,11 @@ impl<A: Access> LayeredAccess for StorageMetricsAccessor<A> {
self.inner.list(path, args).await
}

#[async_backtrace::framed]
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
self.inner.delete().await
}

fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
self.inner
.blocking_read(path, args)
Expand All @@ -208,6 +216,10 @@ impl<A: Access> LayeredAccess for StorageMetricsAccessor<A> {
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
self.inner.blocking_list(path, args)
}

fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
self.inner.blocking_delete()
}
}

pub struct StorageMetricsWrapper<R> {
Expand Down
37 changes: 34 additions & 3 deletions src/common/storage/src/runtime_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ impl<A: Access> LayeredAccess for RuntimeAccessor<A> {
type BlockingWriter = A::BlockingWriter;
type Lister = A::Lister;
type BlockingLister = A::BlockingLister;
type Deleter = RuntimeIO<A::Deleter>;
type BlockingDeleter = A::BlockingDeleter;

fn inner(&self) -> &Self::Inner {
&self.inner
Expand Down Expand Up @@ -139,13 +141,17 @@ impl<A: Access> LayeredAccess for RuntimeAccessor<A> {
.expect("join must success")
}

async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
let op = self.inner.clone();
let path = path.to_string();

self.runtime
.spawn(async move { op.delete(&path, args).await })
.spawn(async move { op.delete().await })
.await
.expect("join must success")
.map(|(rp, r)| {
let r = RuntimeIO::new(r, self.runtime.clone());
(rp, r)
})
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
Expand All @@ -168,6 +174,10 @@ impl<A: Access> LayeredAccess for RuntimeAccessor<A> {
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
self.inner.blocking_list(path, args)
}

fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
self.inner.blocking_delete()
}
}

pub struct RuntimeIO<R: 'static> {
Expand Down Expand Up @@ -200,3 +210,24 @@ impl<R: oio::Read> oio::Read for RuntimeIO<R> {
res
}
}

impl<R: oio::Delete> oio::Delete for RuntimeIO<R> {
fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
self.inner.as_mut().unwrap().delete(path, args)
}

async fn flush(&mut self) -> Result<usize> {
let mut r = self.inner.take().expect("reader must be valid");
let runtime = self.runtime.clone();

let (r, res) = runtime
.spawn(async move {
let res = r.flush().await;
(r, res)
})
.await
.expect("join must success");
self.inner = Some(r);
res
}
}
Loading
Loading