Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
441 changes: 64 additions & 377 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 7 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -312,13 +312,13 @@ hyper-util = { version = "0.1.9", features = ["client", "client-legacy", "tokio"
lru = "0.12"

## in branch dev
iceberg = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "6536f9c", features = [
iceberg = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "32b1403", features = [
"storage-all",
] }
iceberg-catalog-glue = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "6536f9c" }
iceberg-catalog-hms = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "6536f9c" }
iceberg-catalog-rest = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "6536f9c" }
iceberg-catalog-s3tables = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "6536f9c" }
iceberg-catalog-glue = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "32b1403" }
iceberg-catalog-hms = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "32b1403" }
iceberg-catalog-rest = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "32b1403" }
iceberg-catalog-s3tables = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "32b1403" }

# Explicitly specify compatible AWS SDK versions
aws-config = "1.5.18"
Expand Down Expand Up @@ -367,9 +367,9 @@ num-derive = "0.4.2"
num-traits = "0.2.19"
num_cpus = "1.17"
object = "0.36.5"
object_store_opendal = { git = "https://github.com/apache/opendal.git", rev = "02953ef" }
object_store_opendal = { version = "0.54.1" }
once_cell = "1.15.0"
opendal = { git = "https://github.com/apache/opendal.git", rev = "02953ef", features = [
opendal = { version = "0.54.1", features = [
"layers-fastrace",
"layers-prometheus-client",
"layers-async-backtrace",
Expand All @@ -387,8 +387,6 @@ opendal = { git = "https://github.com/apache/opendal.git", rev = "02953ef", feat
"services-webhdfs",
"services-huggingface",
] }
opendal-layer-immutable-index = { git = "https://github.com/apache/opendal.git", rev = "02953ef" }
opendal-layer-observe-metrics-common = { git = "https://github.com/apache/opendal.git", rev = "02953ef" }
openraft = { version = "0.10.0", features = [
"serde",
"tracing-log",
Expand Down
6 changes: 3 additions & 3 deletions src/bendsave/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ mod tests {
use std::path::Path;

use databend_common_base::base::tokio;
use databend_common_storage::Scheme;
use opendal::Scheme;

use super::*;

Expand All @@ -237,12 +237,12 @@ mod tests {
#[tokio::test]
async fn test_load_epochfs_storage() -> Result<()> {
let op = load_bendsave_storage("s3://bendsave/tmp?region=us-east-1").await?;
assert_eq!(op.info().scheme(), Scheme::S3.to_string());
assert_eq!(op.info().scheme(), Scheme::S3);
assert_eq!(op.info().name(), "bendsave");
assert_eq!(op.info().root(), "/tmp/");

let op = load_bendsave_storage("fs://opt").await?;
assert_eq!(op.info().scheme(), Scheme::Fs.to_string());
assert_eq!(op.info().scheme(), Scheme::Fs);
assert_eq!(op.info().root(), "/opt");
Ok(())
}
Expand Down
13 changes: 3 additions & 10 deletions src/common/exception/src/exception_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,19 +183,12 @@ impl From<std::convert::Infallible> for ErrorCode {

impl From<opendal::Error> for ErrorCode {
fn from(error: opendal::Error) -> Self {
let msg = error.message();
let detail = error.to_string();
let detail = detail
.strip_suffix(msg)
.and_then(|err| err.strip_suffix(" => "))
.unwrap_or(&detail);

match error.kind() {
opendal::ErrorKind::NotFound => ErrorCode::StorageNotFound(msg).add_detail(detail),
opendal::ErrorKind::NotFound => ErrorCode::StorageNotFound(error.to_string()),
opendal::ErrorKind::PermissionDenied => {
ErrorCode::StoragePermissionDenied(msg).add_detail(detail)
ErrorCode::StoragePermissionDenied(error.to_string())
}
_ => ErrorCode::StorageOther(msg).add_detail(detail),
_ => ErrorCode::StorageOther(format!("{error:?}")),
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/common/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ iceberg = { workspace = true }
log = { workspace = true }
lru = { workspace = true }
opendal = { workspace = true }
opendal-layer-immutable-index = { workspace = true }
opendal-layer-observe-metrics-common = { workspace = true }
parquet = { workspace = true }
prometheus-client = { workspace = true }
regex = { workspace = true }
Expand Down
1 change: 0 additions & 1 deletion src/common/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ pub use http_client::StorageHttpClient;
mod operator;
pub use operator::DataOperator;
pub use operator::OperatorRegistry;
pub use operator::Scheme;
pub use operator::check_operator;
pub use operator::init_operator;

Expand Down
2 changes: 1 addition & 1 deletion src/common/storage/src/metrics_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ use databend_common_base::runtime::metrics::FamilyHistogram;
use databend_common_base::runtime::metrics::register_counter_family;
use databend_common_base::runtime::metrics::register_gauge_family;
use databend_common_base::runtime::metrics::register_histogram_family;
use opendal::layers::observe;
use opendal::raw::Access;
use opendal::raw::Layer;
use opendal_layer_observe_metrics_common as observe;
use prometheus_client::encoding::EncodeLabel;
use prometheus_client::encoding::EncodeLabelSet;
use prometheus_client::encoding::LabelSetEncoder;
Expand Down
80 changes: 3 additions & 77 deletions src/common/storage/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::env;
use std::io::Error;
use std::io::ErrorKind;
use std::io::Result;
use std::str::FromStr;
use std::sync::LazyLock;
use std::time::Duration;

Expand Down Expand Up @@ -53,13 +52,13 @@ use opendal::layers::AsyncBacktraceLayer;
use opendal::layers::ConcurrentLimitLayer;
use opendal::layers::FastraceLayer;
use opendal::layers::HttpClientLayer;
use opendal::layers::ImmutableIndexLayer;
use opendal::layers::LoggingLayer;
use opendal::layers::RetryInterceptor;
use opendal::layers::RetryLayer;
use opendal::layers::TimeoutLayer;
use opendal::raw::HttpClient;
use opendal::services;
use opendal_layer_immutable_index::ImmutableIndexLayer;

use crate::StorageConfig;
use crate::StorageHttpClient;
Expand Down Expand Up @@ -404,8 +403,8 @@ fn init_s3_operator(cfg: &StorageS3Config) -> Result<impl Builder> {
.session_token(&cfg.security_token)
.role_arn(&cfg.role_arn)
.external_id(&cfg.external_id)
// Don't enable it otherwise we will get Permission in stat unknown files
// .allow_anonymous()
// It's safe to allow anonymous since opendal will perform the check first.
.allow_anonymous()
// Root.
.root(&cfg.root);

Expand Down Expand Up @@ -666,76 +665,3 @@ impl OperatorRegistry for iceberg::io::FileIO {
Ok((file_io.get_operator().clone(), &location[pos..]))
}
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub enum Scheme {
Azblob,
Gcs,
Hdfs,
Ipfs,
S3,
Oss,
Obs,
Cos,
Http,
Fs,
Webhdfs,
Huggingface,
Custom(&'static str),
}

impl Scheme {
/// Convert self into static str.
pub fn into_static(self) -> &'static str {
self.into()
}
}

impl From<Scheme> for &'static str {
fn from(v: Scheme) -> Self {
match v {
Scheme::Azblob => "azblob",
Scheme::Gcs => "gcs",
Scheme::Hdfs => "hdfs",
Scheme::Ipfs => "ipfs",
Scheme::S3 => "s3",
Scheme::Oss => "oss",
Scheme::Obs => "obs",
Scheme::Cos => "cos",
Scheme::Http => "http",
Scheme::Fs => "fs",
Scheme::Webhdfs => "webhdfs",
Scheme::Huggingface => "huggingface",
Scheme::Custom(s) => s,
}
}
}

impl FromStr for Scheme {
type Err = Error;

fn from_str(s: &str) -> Result<Self> {
let s = s.to_lowercase();
match s.as_str() {
"azblob" => Ok(Scheme::Azblob),
"gcs" => Ok(Scheme::Gcs),
"hdfs" => Ok(Scheme::Hdfs),
"ipfs" => Ok(Scheme::Ipfs),
"s3" | "s3a" => Ok(Scheme::S3),
"oss" => Ok(Scheme::Oss),
"obs" => Ok(Scheme::Obs),
"cos" => Ok(Scheme::Cos),
"http" | "https" => Ok(Scheme::Http),
"fs" => Ok(Scheme::Fs),
"webhdfs" => Ok(Scheme::Webhdfs),
"huggingface" | "hf" => Ok(Scheme::Huggingface),
_ => Ok(Scheme::Custom(Box::leak(s.into_boxed_str()))),
}
}
}

impl std::fmt::Display for Scheme {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.into_static())
}
}
22 changes: 15 additions & 7 deletions src/common/storage/src/runtime_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,18 +307,26 @@ impl<R: oio::List> oio::List for RuntimeIO<R> {
}

impl<R: oio::Delete> oio::Delete for RuntimeIO<R> {
async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
self.inner.as_mut().unwrap().delete(path, args).await
fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
self.inner.as_mut().unwrap().delete(path, args)
}

async fn close(&mut self) -> Result<()> {
async fn flush(&mut self) -> Result<usize> {
let mut r = self.inner.take().expect("deleter must be valid");
let runtime = self.runtime.clone();

let _ = runtime
.spawn(async move { r.close().await })
let (r, res) = runtime
.try_spawn(
async move {
let res = r.flush().await;
(r, res)
},
Some(self.spawn_task_name.clone()),
)
.expect("spawn must success")
.await
.expect("join must success")?;
Ok(())
.expect("join must success");
self.inner = Some(r);
res
}
}
5 changes: 1 addition & 4 deletions src/common/storage/src/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,7 @@ impl StageFileInfo {
path,
size: meta.content_length(),
md5: meta.content_md5().map(str::to_string),
last_modified: meta.last_modified().map(|m| {
let ns = m.into_inner().as_nanosecond();
DateTime::from_timestamp_nanos(ns as i64)
}),
last_modified: meta.last_modified(),
etag: meta.etag().map(str::to_string),
status: StageFileStatus::NeedCopy,
creator: None,
Expand Down
28 changes: 9 additions & 19 deletions src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use log::info;
use opendal::Entry;
use opendal::ErrorKind;
use opendal::Operator;
use opendal::Scheme;
use uuid::Version;

/// An assumption of the maximum duration from the time the first block is written to the time the
Expand Down Expand Up @@ -485,7 +486,7 @@ async fn list_until_prefix(
let dal = fuse_table.get_operator_ref();

match dal.info().scheme() {
"fs" => fs_list_until_prefix(dal, path, until, need_one_more, gc_root_meta_ts).await,
Scheme::Fs => fs_list_until_prefix(dal, path, until, need_one_more, gc_root_meta_ts).await,
_ => general_list_until_prefix(dal, path, until, need_one_more, gc_root_meta_ts).await,
}
}
Expand Down Expand Up @@ -586,9 +587,6 @@ async fn is_gc_candidate_segment_block(
})?
};

let last_modified =
DateTime::from_timestamp_nanos(last_modified.into_inner().as_nanosecond() as i64);

Ok(last_modified + ASSUMPTION_MAX_TXN_DURATION < gc_root_meta_ts)
}

Expand Down Expand Up @@ -676,15 +674,12 @@ async fn select_gc_root(
let gc_root = read_snapshot_from_location(fuse_table, &gc_root_path).await;

let gc_root_meta_ts = match dal.stat(&gc_root_path).await {
Ok(v) => v
.last_modified()
.ok_or_else(|| {
ErrorCode::StorageOther(format!(
"Failed to get `last_modified` metadata of the gc root object '{}'",
gc_root_path
))
})
.map(|v| DateTime::from_timestamp_nanos(v.into_inner().as_nanosecond() as i64))?,
Ok(v) => v.last_modified().ok_or_else(|| {
ErrorCode::StorageOther(format!(
"Failed to get `last_modified` metadata of the gc root object '{}'",
gc_root_path
))
})?,
Err(e) => {
return if e.kind() == ErrorKind::NotFound {
// Concurrent vacuum, ignore it
Expand Down Expand Up @@ -716,13 +711,8 @@ async fn select_gc_root(
gc_root_path
))
})?,
Some(v) => v
Some(v) => v,
};

let last_modified = DateTime::from_timestamp_nanos(
last_modified.into_inner().as_nanosecond() as i64,
);

if last_modified + ASSUMPTION_MAX_TXN_DURATION < gc_root_meta_ts {
gc_candidates.push(path.to_owned());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ async fn vacuum_by_duration(
let meta = meta.unwrap();

if let Some(modified) = meta.last_modified() {
if timestamp - modified.into_inner().as_millisecond() < expire_time {
if timestamp - modified.timestamp_millis() < expire_time {
continue;
}
}
Expand Down
18 changes: 8 additions & 10 deletions src/query/ee/tests/it/storages/fuse/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![allow(clippy::let_and_return)]
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -298,15 +297,17 @@ mod test_accessor {
}

impl oio::Delete for MockDeleter {
async fn delete(&mut self, _path: &str, _args: OpDelete) -> opendal::Result<()> {
fn delete(&mut self, _path: &str, _args: OpDelete) -> opendal::Result<()> {
self.size += 1;
Ok(())
}

async fn close(&mut self) -> opendal::Result<()> {
async fn flush(&mut self) -> opendal::Result<usize> {
self.hit_batch.store(true, Ordering::Release);

let n = self.size;
self.size = 0;
Ok(())
Ok(n)
}
}

Expand Down Expand Up @@ -900,10 +901,7 @@ async fn test_vacuum_drop_create_or_replace_impl(vacuum_stmts: &[&str]) -> Resul
async fn new_local_meta() -> MetaStore {
let version = &BUILD_INFO;
let meta_config = MetaConfig::default();
let meta = {
let config = meta_config.to_meta_grpc_client_conf(version);
let provider = Arc::new(MetaStoreProvider::new(config));
provider.create_meta_store().await.unwrap()
};
meta
let config = meta_config.to_meta_grpc_client_conf(version);
let provider = MetaStoreProvider::new(config);
provider.create_meta_store().await.unwrap()
}
2 changes: 1 addition & 1 deletion src/query/service/src/history_tables/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::BTreeMap;

use databend_common_meta_app::storage::StorageParams;
use databend_common_storage::Scheme;
use opendal::Scheme;
use opendal::raw::normalize_root;

#[derive(Debug)]
Expand Down
Loading
Loading