diff --git a/Cargo.lock b/Cargo.lock index 0b92e5e8766ab..29ce3e97dba9e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2961,7 +2961,7 @@ dependencies = [ "databend-storages-common-table-meta", "limits-rs", "log", - "opendal", + "opendal 0.51.0", "serde", "serde_json", "serfig", @@ -3238,7 +3238,7 @@ dependencies = [ "libc", "object", "once_cell", - "opendal", + "opendal 0.51.0", "parquet", "paste", "prost", @@ -3586,7 +3586,7 @@ dependencies = [ "maplit", "num-derive", "num-traits", - "opendal", + "opendal 0.51.0", "paste", "prost", "serde", @@ -3824,7 +3824,7 @@ dependencies = [ "lz4", "match-template", "num", - "opendal", + "opendal 0.51.0", "rand", "ringbuffer", "roaring", @@ -4035,7 +4035,7 @@ dependencies = [ "log", "num-derive", "num-traits", - "opendal", + "opendal 0.51.0", "parking_lot 0.12.3", "prqlc", "rand", @@ -4068,11 +4068,10 @@ dependencies = [ "databend-common-metrics", "databend-common-native", "databend-enterprise-storage-encryption", - "flagset", "futures", "http 1.1.0", "log", - "opendal", + "opendal 0.51.0", "parquet", "prometheus-client", "regex", @@ -4181,7 +4180,7 @@ dependencies = [ "itertools 0.13.0", "jsonb", "log", - "opendal", + "opendal 0.51.0", "parking_lot 0.12.3", "parquet", "rand", @@ -4229,7 +4228,7 @@ dependencies = [ "futures", "hive_metastore", "log", - "opendal", + "opendal 0.51.0", "parquet", "recursive", "serde", @@ -4339,7 +4338,7 @@ dependencies = [ "databend-storages-common-table-meta", "futures-util", "log", - "opendal", + "opendal 0.51.0", "orc-rust", "serde", "serde_json", @@ -4375,7 +4374,7 @@ dependencies = [ "ethnum", "futures", "log", - "opendal", + "opendal 0.51.0", "parquet", "rand", "serde", @@ -4420,7 +4419,7 @@ dependencies = [ "databend-common-storages-parquet", "databend-storages-common-blocks", "databend-storages-common-table-meta", - "opendal", + "opendal 0.51.0", "parquet", "serde", "serde_json", @@ -4461,7 +4460,7 @@ dependencies = [ "enum-as-inner", "futures", "log", - "opendal", + "opendal 0.51.0", "parquet", "serde", "serde_json", @@ -4526,7 +4525,7 @@ dependencies = [ "jsonb", "log", "once_cell", - "opendal", + "opendal 0.51.0", "parking_lot 0.12.3", "regex", "serde", @@ -4755,7 +4754,7 @@ dependencies = [ "jsonb", "jwt-simple", "log", - "opendal", + "opendal 0.51.0", "tantivy", "tempfile", ] @@ -5110,7 +5109,7 @@ dependencies = [ "mysql_async", "naive-cityhash", "num_cpus", - "opendal", + "opendal 0.51.0", "opensrv-mysql", "opentelemetry", "opentelemetry_sdk", @@ -5293,7 +5292,7 @@ dependencies = [ "fastrace", "futures", "log", - "opendal", + "opendal 0.51.0", ] [[package]] @@ -8404,7 +8403,7 @@ dependencies = [ "murmur3", "num-bigint", "once_cell", - "opendal", + "opendal 0.50.1", "ordered-float 4.5.0", "parquet", "paste", @@ -10327,9 +10326,8 @@ dependencies = [ [[package]] name = "object_store_opendal" -version = "0.48.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d37d6ca6cad56e446feab00d1378b0aae992f71bd9e04fe0454e8937ce1f9c9" +version = "0.48.3" +source = "git+https://github.com/apache/opendal?rev=f7f9990#f7f9990f95146400c3aef8afc11804e4a6e3afe3" dependencies = [ "async-trait", "bytes", @@ -10337,7 +10335,7 @@ dependencies = [ "futures", "futures-util", "object_store", - "opendal", + "opendal 0.51.0", "pin-project", "tokio", ] @@ -10382,6 +10380,35 @@ name = "opendal" version = "0.50.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "213222b6c86949314d8f51acb26d8241e7c8dd0879b016a79471d49f21ee592f" +dependencies = [ + "anyhow", + "async-trait", + "backon", + "base64 0.22.1", + "bytes", + "chrono", + "crc32c", + "flagset", + "futures", + "getrandom", + "http 1.1.0", + "log", + "md-5", + "once_cell", + "percent-encoding", + "quick-xml 0.36.1", + "reqsign", + "reqwest", + "serde", + "serde_json", + "tokio", + "uuid", +] + +[[package]] +name = "opendal" +version = "0.51.0" +source = "git+https://github.com/apache/opendal?rev=f7f9990#f7f9990f95146400c3aef8afc11804e4a6e3afe3" dependencies = [ "anyhow", "async-backtrace", @@ -10392,7 +10419,6 @@ dependencies = [ "chrono", "crc32c", "fastrace", - "flagset", "futures", "getrandom", "hdrs", @@ -12172,9 +12198,9 @@ checksum = "e3a8614ee435691de62bcffcf4a66d91b3594bf1428a5722e79103249a095690" [[package]] name = "reqsign" -version = "0.16.0" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03dd4ba7c3901dd43e6b8c7446a760d45bc1ea4301002e1a6fa48f97c3a796fa" +checksum = "eb0075a66c8bfbf4cc8b70dca166e722e1f55a3ea9250ecbb85f4d92a5f64149" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 4257c226fa3fe..9c98ea6278d3c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -291,7 +291,6 @@ ethnum = { version = "1.5.0" } fallible-streaming-iterator = "0.1" faststr = "0.2" feature-set = { version = "0.1.1" } -flagset = "0.4" flatbuffers = "24" # Must use the same version with arrow-ipc flate2 = "1" foreign_vec = "0.1.0" @@ -363,10 +362,10 @@ num-derive = "0.3.3" num-traits = "0.2.19" num_cpus = "1.13.1" object = "0.36.5" -object_store_opendal = "0.48.1" +object_store_opendal = { git = "https://github.com/apache/opendal", package = "object_store_opendal", rev = "f7f9990" } once_cell = "1.15.0" openai_api_rust = "0.1" -opendal = { version = "0.50.1", features = [ +opendal = { version = "0.51", git = "https://github.com/apache/opendal", rev = "f7f9990", features = [ "layers-fastrace", "layers-prometheus-client", "layers-async-backtrace", diff --git a/src/common/storage/Cargo.toml b/src/common/storage/Cargo.toml index f1990cdb5d734..da306bc35648c 100644 --- a/src/common/storage/Cargo.toml +++ b/src/common/storage/Cargo.toml @@ -24,7 +24,6 @@ databend-common-meta-app = { workspace = true } databend-common-metrics = { workspace = true } databend-common-native = { workspace = true } databend-enterprise-storage-encryption = { workspace = true } -flagset = { workspace = true } futures = { workspace = true } http = { workspace = true } log = { workspace = true } diff --git a/src/common/storage/src/metrics.rs b/src/common/storage/src/metrics.rs index f14f1d58b7ca7..19ed832d8380f 100644 --- a/src/common/storage/src/metrics.rs +++ b/src/common/storage/src/metrics.rs @@ -25,6 +25,7 @@ use opendal::raw::LayeredAccess; use opendal::raw::OpList; use opendal::raw::OpRead; use opendal::raw::OpWrite; +use opendal::raw::RpDelete; use opendal::raw::RpList; use opendal::raw::RpRead; use opendal::raw::RpWrite; @@ -167,6 +168,8 @@ impl LayeredAccess for StorageMetricsAccessor { type BlockingWriter = StorageMetricsWrapper; type Lister = A::Lister; type BlockingLister = A::BlockingLister; + type Deleter = A::Deleter; + type BlockingDeleter = A::BlockingDeleter; fn inner(&self) -> &Self::Inner { &self.inner @@ -193,6 +196,11 @@ impl LayeredAccess for StorageMetricsAccessor { self.inner.list(path, args).await } + #[async_backtrace::framed] + async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { + self.inner.delete().await + } + fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { self.inner .blocking_read(path, args) @@ -208,6 +216,10 @@ impl LayeredAccess for StorageMetricsAccessor { fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { self.inner.blocking_list(path, args) } + + fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> { + self.inner.blocking_delete() + } } pub struct StorageMetricsWrapper { diff --git a/src/common/storage/src/runtime_layer.rs b/src/common/storage/src/runtime_layer.rs index fc662a83504e9..fb7714a096436 100644 --- a/src/common/storage/src/runtime_layer.rs +++ b/src/common/storage/src/runtime_layer.rs @@ -93,6 +93,8 @@ impl LayeredAccess for RuntimeAccessor { type BlockingWriter = A::BlockingWriter; type Lister = A::Lister; type BlockingLister = A::BlockingLister; + type Deleter = RuntimeIO; + type BlockingDeleter = A::BlockingDeleter; fn inner(&self) -> &Self::Inner { &self.inner @@ -139,13 +141,17 @@ impl LayeredAccess for RuntimeAccessor { .expect("join must success") } - async fn delete(&self, path: &str, args: OpDelete) -> Result { + async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { let op = self.inner.clone(); - let path = path.to_string(); + self.runtime - .spawn(async move { op.delete(&path, args).await }) + .spawn(async move { op.delete().await }) .await .expect("join must success") + .map(|(rp, r)| { + let r = RuntimeIO::new(r, self.runtime.clone()); + (rp, r) + }) } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { @@ -168,6 +174,10 @@ impl LayeredAccess for RuntimeAccessor { fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { self.inner.blocking_list(path, args) } + + fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> { + self.inner.blocking_delete() + } } pub struct RuntimeIO { @@ -200,3 +210,24 @@ impl oio::Read for RuntimeIO { res } } + +impl oio::Delete for RuntimeIO { + fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { + self.inner.as_mut().unwrap().delete(path, args) + } + + async fn flush(&mut self) -> Result { + let mut r = self.inner.take().expect("reader must be valid"); + let runtime = self.runtime.clone(); + + let (r, res) = runtime + .spawn(async move { + let res = r.flush().await; + (r, res) + }) + .await + .expect("join must success"); + self.inner = Some(r); + res + } +} diff --git a/src/common/storage/src/stage.rs b/src/common/storage/src/stage.rs index 005ff424d00ea..18949ef07a853 100644 --- a/src/common/storage/src/stage.rs +++ b/src/common/storage/src/stage.rs @@ -30,7 +30,6 @@ use futures::StreamExt; use futures::TryStreamExt; use opendal::EntryMode; use opendal::Metadata; -use opendal::Metakey; use opendal::Operator; use regex::Regex; @@ -66,11 +65,6 @@ impl StageFileInfo { creator: None, } } - - /// NOTE: update this query when add new meta - pub fn meta_query() -> flagset::FlagSet { - Metakey::ContentLength | Metakey::ContentMd5 | Metakey::LastModified | Metakey::Etag - } } pub fn init_stage_operator(stage_info: &StageInfo) -> Result { @@ -278,21 +272,26 @@ impl StageFilesInfo { }; let file_exact_stream = stream::iter(file_exact.clone().into_iter()); - let lister = operator - .lister_with(path) - .recursive(true) - .metakey(StageFileInfo::meta_query()) - .await?; + let lister = operator.lister_with(path).recursive(true).await?; let pattern = Arc::new(pattern); + let operator = operator.clone(); let files_with_prefix = lister.filter_map(move |result| { let pattern = pattern.clone(); + let operator = operator.clone(); async move { match result { Ok(entry) => { - let meta = entry.metadata(); - if check_file(&entry.path()[prefix_len..], meta.mode(), &pattern) { - Some(Ok(StageFileInfo::new(entry.path().to_string(), meta))) + let (path, mut meta) = entry.into_parts(); + if check_file(&path[prefix_len..], meta.mode(), &pattern) { + if meta.etag().is_none() { + meta = match operator.stat(&path).await { + Ok(meta) => meta, + Err(err) => return Some(Err(ErrorCode::from(err))), + } + } + + Some(Ok(StageFileInfo::new(path, &meta))) } else { None } @@ -389,19 +388,22 @@ fn blocking_list_files_with_pattern( _ => {} }; let prefix_len = if path == "/" { 0 } else { path.len() }; - let list = operator - .lister_with(path) - .recursive(true) - .metakey(StageFileInfo::meta_query()) - .call()?; + let list = operator.lister_with(path).recursive(true).call()?; if files.len() == max_files { return Ok(files); } for obj in list { let obj = obj?; - let meta = obj.metadata(); - if check_file(&obj.path()[prefix_len..], meta.mode(), &pattern) { - files.push(StageFileInfo::new(obj.path().to_string(), meta)); + let (path, mut meta) = obj.into_parts(); + if check_file(&path[prefix_len..], meta.mode(), &pattern) { + if meta.etag().is_none() { + meta = match operator.stat(&path) { + Ok(meta) => meta, + Err(err) => return Err(ErrorCode::from(err)), + } + } + + files.push(StageFileInfo::new(path, &meta)); if files.len() == max_files { return Ok(files); } diff --git a/src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs b/src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs index 08c48cec34dc3..ef9e065d1be1e 100644 --- a/src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs +++ b/src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs @@ -27,7 +27,6 @@ use futures_util::TryStreamExt; use log::error; use log::info; use opendal::EntryMode; -use opendal::Metakey; use opendal::Operator; #[async_backtrace::framed] pub async fn do_vacuum_drop_table( @@ -80,12 +79,7 @@ async fn vacuum_drop_single_table( result?; } Some(dry_run_limit) => { - let mut ds = operator - .lister_with(&dir) - .recursive(true) - .metakey(Metakey::Mode) - .metakey(Metakey::ContentLength) - .await?; + let mut ds = operator.lister_with(&dir).recursive(true).await?; loop { let entry = ds.try_next().await; @@ -93,10 +87,15 @@ async fn vacuum_drop_single_table( Ok(Some(de)) => { let meta = de.metadata(); if EntryMode::FILE == meta.mode() { + let mut content_length = meta.content_length(); + if content_length == 0 { + content_length = operator.stat(de.path()).await?.content_length(); + } + list_files.push(( table_info.name.clone(), de.name().to_string(), - meta.content_length(), + content_length, )); if list_files.len() >= dry_run_limit { break; diff --git a/src/query/ee/src/storages/fuse/operations/vacuum_temporary_files.rs b/src/query/ee/src/storages/fuse/operations/vacuum_temporary_files.rs index 9cbd45e76a9ba..6ff77bda4ebe0 100644 --- a/src/query/ee/src/storages/fuse/operations/vacuum_temporary_files.rs +++ b/src/query/ee/src/storages/fuse/operations/vacuum_temporary_files.rs @@ -23,7 +23,6 @@ use databend_common_catalog::table_context::AbortChecker; use databend_common_exception::Result; use databend_common_storage::DataOperator; use databend_enterprise_vacuum_handler::vacuum_handler::VacuumTempOptions; -use futures_util::stream; use futures_util::TryStreamExt; use log::info; use opendal::Buffer; @@ -162,9 +161,7 @@ async fn vacuum_by_duration( if temp_files.len() <= limit { removed_total += temp_files.len(); - let _ = operator - .remove_via(stream::iter(temp_files.into_iter())) - .await; + let _ = operator.delete_iter(temp_files).await; } // Log for the final total progress @@ -232,13 +229,14 @@ async fn vacuum_by_meta_buffer( let remain = remain.to_vec(); let cur_removed = to_be_removed.len(); - let remove_temp_files_path = stream::iter( - files - .into_iter() - .filter(|f| f.starts_with(temporary_dir)) - .take(limit), - ); - let _ = operator.remove_via(remove_temp_files_path).await; + let _ = operator + .delete_iter( + files + .into_iter() + .filter(|f| f.starts_with(temporary_dir)) + .take(limit), + ) + .await; // update unfinished meta file if !remain.is_empty() { @@ -297,9 +295,7 @@ async fn vacuum_by_list_dir( batches.push(dir_path.to_owned()); let cur_removed = batches.len().min(limit); - let _ = operator - .remove_via(stream::iter(batches.into_iter().take(limit))) - .await; + let _ = operator.delete_iter(batches.into_iter().take(limit)).await; *removed_total += cur_removed; // Log for the current batch diff --git a/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs b/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs index d7430eab68254..9ed435f9133df 100644 --- a/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs +++ b/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs @@ -209,10 +209,8 @@ mod test_accessor { use opendal::raw::oio; use opendal::raw::oio::Entry; use opendal::raw::MaybeSend; - use opendal::raw::OpBatch; use opendal::raw::OpDelete; use opendal::raw::OpList; - use opendal::raw::RpBatch; use opendal::raw::RpDelete; use opendal::raw::RpList; @@ -222,7 +220,7 @@ mod test_accessor { #[derive(Debug)] pub(crate) struct AccessorFaultyDeletion { hit_delete: AtomicBool, - hit_batch: AtomicBool, + hit_batch: Arc, hit_stat: AtomicBool, inject_delete_faulty: bool, inject_stat_faulty: bool, @@ -232,7 +230,7 @@ mod test_accessor { pub(crate) fn with_delete_fault() -> Self { AccessorFaultyDeletion { hit_delete: AtomicBool::new(false), - hit_batch: AtomicBool::new(false), + hit_batch: Arc::new(AtomicBool::new(false)), hit_stat: AtomicBool::new(false), inject_delete_faulty: true, inject_stat_faulty: false, @@ -242,7 +240,7 @@ mod test_accessor { pub(crate) fn with_stat_fault() -> Self { AccessorFaultyDeletion { hit_delete: AtomicBool::new(false), - hit_batch: AtomicBool::new(false), + hit_batch: Arc::new(AtomicBool::new(false)), hit_stat: AtomicBool::new(false), inject_delete_faulty: false, inject_stat_faulty: true, @@ -252,14 +250,6 @@ mod test_accessor { pub(crate) fn hit_delete_operation(&self) -> bool { self.hit_delete.load(Ordering::Acquire) } - - pub(crate) fn hit_batch_operation(&self) -> bool { - self.hit_batch.load(Ordering::Acquire) - } - - pub(crate) fn hit_stat_operation(&self) -> bool { - self.hit_stat.load(Ordering::Acquire) - } } pub struct VecLister(Vec); @@ -281,6 +271,26 @@ mod test_accessor { } } + pub struct MockDeleter { + size: usize, + hit_batch: Arc, + } + + impl oio::Delete for MockDeleter { + fn delete(&mut self, _path: &str, _args: OpDelete) -> opendal::Result<()> { + self.size += 1; + Ok(()) + } + + async fn flush(&mut self) -> opendal::Result { + self.hit_batch.store(true, Ordering::Release); + + let n = self.size; + self.size = 0; + Ok(n) + } + } + impl Access for AccessorFaultyDeletion { type Reader = (); type BlockingReader = (); @@ -288,14 +298,16 @@ mod test_accessor { type BlockingWriter = (); type Lister = VecLister; type BlockingLister = (); + type Deleter = MockDeleter; + type BlockingDeleter = (); fn info(&self) -> Arc { let mut info = AccessorInfo::default(); let cap = info.full_capability_mut(); cap.stat = true; cap.create_dir = true; - cap.batch = true; cap.delete = true; + cap.delete_max_size = Some(1000); cap.list = true; info.into() } @@ -317,30 +329,19 @@ mod test_accessor { } } - async fn delete(&self, _path: &str, _args: OpDelete) -> opendal::Result { + async fn delete(&self) -> opendal::Result<(RpDelete, Self::Deleter)> { self.hit_delete.store(true, Ordering::Release); - if self.inject_delete_faulty { - Err(opendal::Error::new( - opendal::ErrorKind::Unexpected, - "does not matter (delete)", - )) - } else { - Ok(RpDelete::default()) - } - } - - async fn batch(&self, _args: OpBatch) -> opendal::Result { - self.hit_delete.store(true, Ordering::Release); - self.hit_batch.store(true, Ordering::Release); - // in our case, there are only batch deletions if self.inject_delete_faulty { Err(opendal::Error::new( opendal::ErrorKind::Unexpected, "does not matter (delete)", )) } else { - Ok(RpBatch::new(vec![])) + Ok((RpDelete::default(), MockDeleter { + size: 0, + hit_batch: self.hit_batch.clone(), + })) } } @@ -460,8 +461,6 @@ async fn test_fuse_vacuum_drop_tables_dry_run_with_obj_not_found_error() -> Resu let tables = vec![table]; let num_threads = 1; let result = vacuum_drop_tables_by_table_info(num_threads, tables, Some(usize::MAX)).await; - // verify that accessor.stat() was called - assert!(faulty_accessor.hit_stat_operation()); // verify that errors of NotFound are swallowed assert!(result.is_ok()); } @@ -476,8 +475,6 @@ async fn test_fuse_vacuum_drop_tables_dry_run_with_obj_not_found_error() -> Resu let tables = vec![table.clone(), table]; let num_threads = 2; let result = vacuum_drop_tables_by_table_info(num_threads, tables, Some(usize::MAX)).await; - // verify that accessor.stat() was called - assert!(faulty_accessor.hit_stat_operation()); // verify that errors of NotFound are swallowed assert!(result.is_ok()); } @@ -530,7 +527,6 @@ async fn test_remove_files_in_batch_do_not_swallow_errors() -> Result<()> { // verify that accessor.delete() was called assert!(faulty_accessor.hit_delete_operation()); - assert!(faulty_accessor.hit_batch_operation()); Ok(()) } diff --git a/src/query/service/src/interpreters/interpreter_table_vacuum.rs b/src/query/service/src/interpreters/interpreter_table_vacuum.rs index 8f4f9860865ea..1711dd2c39383 100644 --- a/src/query/service/src/interpreters/interpreter_table_vacuum.rs +++ b/src/query/service/src/interpreters/interpreter_table_vacuum.rs @@ -29,7 +29,6 @@ use databend_common_storages_fuse::FUSE_TBL_SEGMENT_PREFIX; use databend_common_storages_fuse::FUSE_TBL_SNAPSHOT_PREFIX; use databend_common_storages_fuse::FUSE_TBL_XOR_BLOOM_INDEX_PREFIX; use databend_enterprise_vacuum_handler::get_vacuum_handler; -use opendal::Metakey; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; @@ -75,12 +74,16 @@ impl VacuumTableInterpreter { for (dir_prefix, stat) in prefix_with_stats { for entry in operator .list_with(&format!("{}/{}/", table_data_prefix, dir_prefix)) - .metakey(Metakey::ContentLength) .await? { if entry.metadata().is_file() { + let mut content_length = entry.metadata().content_length(); + if content_length == 0 { + content_length = operator.stat(entry.path()).await?.content_length(); + } + stat.0 += 1; - stat.1 += entry.metadata().content_length(); + stat.1 += content_length; } } } diff --git a/src/query/storages/common/io/src/files.rs b/src/query/storages/common/io/src/files.rs index 57b30512f9c43..9b51b242e0af0 100644 --- a/src/query/storages/common/io/src/files.rs +++ b/src/query/storages/common/io/src/files.rs @@ -99,7 +99,7 @@ impl Files { info!("deleting files {:?}", &locations); let num_of_files = locations.len(); - op.remove(locations).await?; + op.delete_iter(locations).await?; info!( "deleted files, number of files {}, time used {:?}", diff --git a/src/query/storages/fuse/src/io/snapshots.rs b/src/query/storages/fuse/src/io/snapshots.rs index 2b720723a1f04..7a038937945e5 100644 --- a/src/query/storages/fuse/src/io/snapshots.rs +++ b/src/query/storages/fuse/src/io/snapshots.rs @@ -37,7 +37,6 @@ use futures_util::TryStreamExt; use log::info; use log::warn; use opendal::EntryMode; -use opendal::Metakey; use opendal::Operator; use crate::io::MetaReaders; @@ -360,19 +359,21 @@ impl SnapshotsIO { exclude_file: Option<&str>, ) -> Result> { let mut file_list = vec![]; - let mut ds = op - .lister_with(prefix) - .metakey(Metakey::Mode | Metakey::LastModified) - .await?; + let mut ds = op.lister_with(prefix).await?; while let Some(de) = ds.try_next().await? { let meta = de.metadata(); match meta.mode() { EntryMode::FILE => match exclude_file { Some(path) if de.path() == path => continue, _ => { + let last_modified = if let Some(last_modified) = meta.last_modified() { + Some(last_modified) + } else { + op.stat(de.path()).await?.last_modified() + }; + let location = de.path().to_string(); - let modified = meta.last_modified(); - file_list.push((location, modified)); + file_list.push((location, last_modified)); } }, _ => { diff --git a/src/query/storages/fuse/src/operations/navigate.rs b/src/query/storages/fuse/src/operations/navigate.rs index 2f2ad06df6613..380c75f036671 100644 --- a/src/query/storages/fuse/src/operations/navigate.rs +++ b/src/query/storages/fuse/src/operations/navigate.rs @@ -28,9 +28,7 @@ use databend_storages_common_table_meta::meta::TableSnapshot; use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION; use databend_storages_common_table_meta::table::OPT_KEY_SOURCE_TABLE_ID; use futures::TryStreamExt; -use log::warn; use opendal::EntryMode; -use opendal::Metakey; use crate::io::MetaReaders; use crate::io::SnapshotHistoryReader; @@ -362,15 +360,18 @@ impl FuseTable { where F: FnMut(String, DateTime) -> bool { let mut file_list = vec![]; let op = self.operator.clone(); - let mut ds = op - .lister_with(&prefix) - .metakey(Metakey::Mode | Metakey::LastModified) - .await?; + let mut ds = op.lister_with(&prefix).await?; while let Some(de) = ds.try_next().await? { let meta = de.metadata(); match meta.mode() { EntryMode::FILE => { - let modified = meta.last_modified(); + let modified = if let Some(v) = meta.last_modified() { + Some(v) + } else { + let meta = op.stat(de.path()).await?; + meta.last_modified() + }; + let location = de.path().to_string(); if let Some(modified) = modified { if f(location.clone(), modified) { @@ -379,7 +380,6 @@ impl FuseTable { } } _ => { - warn!("found not snapshot file in {:}, found: {:?}", prefix, de); continue; } } diff --git a/src/query/storages/fuse/src/table_functions/fuse_time_travel_size.rs b/src/query/storages/fuse/src/table_functions/fuse_time_travel_size.rs index 5c41a62cad92f..4ce9e3271094b 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_time_travel_size.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_time_travel_size.rs @@ -32,7 +32,6 @@ use databend_common_expression::TableSchemaRef; use databend_common_expression::TableSchemaRefExt; use futures_util::TryStreamExt; use log::info; -use opendal::Metakey; use opendal::Operator; use super::parse_opt_opt_args; @@ -197,14 +196,18 @@ impl SimpleArgFunc for FuseTimeTravelSize { } async fn get_time_travel_size(storage_prefix: &str, op: &Operator) -> Result { - let mut lister = op - .lister_with(storage_prefix) - .recursive(true) - .metakey(Metakey::ContentLength) - .await?; + let mut lister = op.lister_with(storage_prefix).recursive(true).await?; let mut size = 0; while let Some(entry) = lister.try_next().await? { - size += entry.metadata().content_length(); + // Skip directories while calculating size + if entry.metadata().is_dir() { + continue; + } + let mut content_length = entry.metadata().content_length(); + if content_length == 0 { + content_length = op.stat(entry.path()).await?.content_length(); + } + size += content_length; } Ok(size) } diff --git a/src/query/storages/hive/hive/src/hive_table.rs b/src/query/storages/hive/hive/src/hive_table.rs index 3d505a6520e36..b2de36a1e7bc4 100644 --- a/src/query/storages/hive/hive/src/hive_table.rs +++ b/src/query/storages/hive/hive/src/hive_table.rs @@ -59,7 +59,6 @@ use futures::TryStreamExt; use log::info; use log::trace; use opendal::EntryMode; -use opendal::Metakey; use opendal::Operator; use super::hive_catalog::HiveCatalog; @@ -600,10 +599,7 @@ async fn do_list_files_from_dir( sem: Arc, ) -> Result<(Vec, Vec)> { let _a = sem.acquire().await.unwrap(); - let mut m = operator - .lister_with(&location) - .metakey(Metakey::Mode | Metakey::ContentLength) - .await?; + let mut m = operator.lister_with(&location).await?; let mut all_files = vec![]; let mut all_dirs = vec![]; @@ -622,8 +618,11 @@ async fn do_list_files_from_dir( match meta.mode() { EntryMode::FILE => { + let mut length = meta.content_length(); + if length == 0 { + length = operator.stat(path).await?.content_length(); + } let location = path.to_string(); - let length = meta.content_length(); all_files.push(HivePartInfo::create(location, vec![], length)); } EntryMode::DIR => { diff --git a/src/query/storages/system/src/temp_files_table.rs b/src/query/storages/system/src/temp_files_table.rs index 972308cef170f..385a5a3496f8b 100644 --- a/src/query/storages/system/src/temp_files_table.rs +++ b/src/query/storages/system/src/temp_files_table.rs @@ -52,9 +52,9 @@ use futures::stream::Chunks; use futures::stream::Take; use futures::StreamExt; use opendal::operator_futures::FutureLister; -use opendal::Entry; use opendal::Lister; -use opendal::Metakey; +use opendal::Metadata; +use opendal::Operator; use crate::table::SystemTablePart; @@ -155,16 +155,13 @@ impl TempFilesTable { let limit = push_downs.as_ref().and_then(|x| x.limit); let operator = DataOperator::instance().operator(); - let lister = operator - .lister_with(&location_prefix) - .recursive(true) - .metakey(Metakey::LastModified | Metakey::ContentLength); + let lister = operator.lister_with(&location_prefix).recursive(true); let stream = { let prefix = location_prefix.clone(); let mut counter = 0; let ctx = ctx.clone(); - let builder = ListerStreamSourceBuilder::with_lister_fut(lister); + let builder = ListerStreamSourceBuilder::with_lister_fut(operator, lister); builder .limit_opt(limit) .chunk_size(MAX_BATCH_SIZE) @@ -208,15 +205,17 @@ impl TempFilesTable { ) } - fn block_from_entries(location_prefix: &str, entries: Vec) -> Result { + fn block_from_entries( + location_prefix: &str, + entries: Vec<(String, Metadata)>, + ) -> Result { let num_items = entries.len(); let mut temp_files_name: Vec = Vec::with_capacity(num_items); let mut temp_files_content_length = Vec::with_capacity(num_items); let mut temp_files_last_modified = Vec::with_capacity(num_items); - for entry in entries { - let metadata = entry.metadata(); + for (path, metadata) in entries { if metadata.is_file() { - temp_files_name.push(entry.path().trim_start_matches(location_prefix).to_string()); + temp_files_name.push(path.trim_start_matches(location_prefix).to_string()); temp_files_last_modified .push(metadata.last_modified().map(|x| x.timestamp_micros())); @@ -238,6 +237,7 @@ const MAX_BATCH_SIZE: usize = 1000; pub struct ListerStreamSourceBuilder where T: Future> + Send + 'static { + op: Operator, lister_fut: FutureLister, limit: Option, chunk_size: usize, @@ -246,8 +246,9 @@ where T: Future> + Send + 'static impl ListerStreamSourceBuilder where T: Future> + Send + 'static { - pub fn with_lister_fut(lister_fut: FutureLister) -> Self { + pub fn with_lister_fut(op: Operator, lister_fut: FutureLister) -> Self { Self { + op, lister_fut, limit: None, chunk_size: MAX_BATCH_SIZE, @@ -266,9 +267,13 @@ where T: Future> + Send + 'static pub fn build( self, - block_builder: (impl FnMut(Vec) -> Result + Sync + Send + 'static), + block_builder: (impl FnMut(Vec<(String, Metadata)>) -> Result + + Sync + + Send + + 'static), ) -> Result { stream_source_from_entry_lister_with_chunk_size( + self.op.clone(), self.lister_fut, self.limit, self.chunk_size, @@ -278,10 +283,11 @@ where T: Future> + Send + 'static } fn stream_source_from_entry_lister_with_chunk_size( + op: Operator, lister_fut: FutureLister, limit: Option, chunk_size: usize, - block_builder: (impl FnMut(Vec) -> Result + Sync + Send + 'static), + block_builder: (impl FnMut(Vec<(String, Metadata)>) -> Result + Sync + Send + 'static), ) -> Result where T: Future> + Send + 'static, @@ -293,9 +299,9 @@ where let state = ListerState::::Uninitialized(lister_fut); - let stream = stream::try_unfold( - (state, block_builder), - move |(mut state, mut builder)| async move { + let stream = stream::try_unfold((state, block_builder), move |(mut state, mut builder)| { + let op = op.clone(); + async move { let mut lister = { match state { ListerState::Uninitialized(fut) => { @@ -306,15 +312,23 @@ where } }; if let Some(entries) = lister.next().await { - let entries = entries.into_iter().collect::>>()?; - let data_block = builder(entries)?; + let mut items = Vec::with_capacity(entries.len()); + for entry in entries { + let (path, mut metadata) = entry?.into_parts(); + if metadata.is_file() && metadata.last_modified().is_none() { + metadata = op.stat(&path).await?; + } + items.push((path, metadata)) + } + + let data_block = builder(items)?; state = ListerState::Initialized(lister); Ok(Some((data_block, (state, builder)))) } else { Ok(None) } - }, - ); + } + }); Ok(stream.boxed()) }