Skip to content

Commit 5043b3c

Browse files
authored
chore(query): revert "fix(query): update opendal (#19110)" (#19146)
* revert * revert * revert
1 parent a3fb83a commit 5043b3c

File tree

21 files changed

+184
-565
lines changed

21 files changed

+184
-565
lines changed

Cargo.lock

Lines changed: 64 additions & 377 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -312,13 +312,13 @@ hyper-util = { version = "0.1.9", features = ["client", "client-legacy", "tokio"
312312
lru = "0.12"
313313

314314
## in branch dev
315-
iceberg = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "6536f9c", features = [
315+
iceberg = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "32b1403", features = [
316316
"storage-all",
317317
] }
318-
iceberg-catalog-glue = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "6536f9c" }
319-
iceberg-catalog-hms = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "6536f9c" }
320-
iceberg-catalog-rest = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "6536f9c" }
321-
iceberg-catalog-s3tables = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "6536f9c" }
318+
iceberg-catalog-glue = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "32b1403" }
319+
iceberg-catalog-hms = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "32b1403" }
320+
iceberg-catalog-rest = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "32b1403" }
321+
iceberg-catalog-s3tables = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "32b1403" }
322322

323323
# Explicitly specify compatible AWS SDK versions
324324
aws-config = "1.5.18"
@@ -367,9 +367,9 @@ num-derive = "0.4.2"
367367
num-traits = "0.2.19"
368368
num_cpus = "1.17"
369369
object = "0.36.5"
370-
object_store_opendal = { git = "https://github.com/apache/opendal.git", rev = "02953ef" }
370+
object_store_opendal = { version = "0.54.1" }
371371
once_cell = "1.15.0"
372-
opendal = { git = "https://github.com/apache/opendal.git", rev = "02953ef", features = [
372+
opendal = { version = "0.54.1", features = [
373373
"layers-fastrace",
374374
"layers-prometheus-client",
375375
"layers-async-backtrace",
@@ -387,8 +387,6 @@ opendal = { git = "https://github.com/apache/opendal.git", rev = "02953ef", feat
387387
"services-webhdfs",
388388
"services-huggingface",
389389
] }
390-
opendal-layer-immutable-index = { git = "https://github.com/apache/opendal.git", rev = "02953ef" }
391-
opendal-layer-observe-metrics-common = { git = "https://github.com/apache/opendal.git", rev = "02953ef" }
392390
openraft = { version = "0.10.0", features = [
393391
"serde",
394392
"tracing-log",

src/bendsave/src/storage.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ mod tests {
219219
use std::path::Path;
220220

221221
use databend_common_base::base::tokio;
222-
use databend_common_storage::Scheme;
222+
use opendal::Scheme;
223223

224224
use super::*;
225225

@@ -237,12 +237,12 @@ mod tests {
237237
#[tokio::test]
238238
async fn test_load_epochfs_storage() -> Result<()> {
239239
let op = load_bendsave_storage("s3://bendsave/tmp?region=us-east-1").await?;
240-
assert_eq!(op.info().scheme(), Scheme::S3.to_string());
240+
assert_eq!(op.info().scheme(), Scheme::S3);
241241
assert_eq!(op.info().name(), "bendsave");
242242
assert_eq!(op.info().root(), "/tmp/");
243243

244244
let op = load_bendsave_storage("fs://opt").await?;
245-
assert_eq!(op.info().scheme(), Scheme::Fs.to_string());
245+
assert_eq!(op.info().scheme(), Scheme::Fs);
246246
assert_eq!(op.info().root(), "/opt");
247247
Ok(())
248248
}

src/common/exception/src/exception_into.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -183,19 +183,12 @@ impl From<std::convert::Infallible> for ErrorCode {
183183

184184
impl From<opendal::Error> for ErrorCode {
185185
fn from(error: opendal::Error) -> Self {
186-
let msg = error.message();
187-
let detail = error.to_string();
188-
let detail = detail
189-
.strip_suffix(msg)
190-
.and_then(|err| err.strip_suffix(" => "))
191-
.unwrap_or(&detail);
192-
193186
match error.kind() {
194-
opendal::ErrorKind::NotFound => ErrorCode::StorageNotFound(msg).add_detail(detail),
187+
opendal::ErrorKind::NotFound => ErrorCode::StorageNotFound(error.to_string()),
195188
opendal::ErrorKind::PermissionDenied => {
196-
ErrorCode::StoragePermissionDenied(msg).add_detail(detail)
189+
ErrorCode::StoragePermissionDenied(error.to_string())
197190
}
198-
_ => ErrorCode::StorageOther(msg).add_detail(detail),
191+
_ => ErrorCode::StorageOther(format!("{error:?}")),
199192
}
200193
}
201194
}

src/common/storage/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@ iceberg = { workspace = true }
3434
log = { workspace = true }
3535
lru = { workspace = true }
3636
opendal = { workspace = true }
37-
opendal-layer-immutable-index = { workspace = true }
38-
opendal-layer-observe-metrics-common = { workspace = true }
3937
parquet = { workspace = true }
4038
prometheus-client = { workspace = true }
4139
regex = { workspace = true }

src/common/storage/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ pub use http_client::StorageHttpClient;
4343
mod operator;
4444
pub use operator::DataOperator;
4545
pub use operator::OperatorRegistry;
46-
pub use operator::Scheme;
4746
pub use operator::check_operator;
4847
pub use operator::init_operator;
4948

src/common/storage/src/metrics_layer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ use databend_common_base::runtime::metrics::FamilyHistogram;
2323
use databend_common_base::runtime::metrics::register_counter_family;
2424
use databend_common_base::runtime::metrics::register_gauge_family;
2525
use databend_common_base::runtime::metrics::register_histogram_family;
26+
use opendal::layers::observe;
2627
use opendal::raw::Access;
2728
use opendal::raw::Layer;
28-
use opendal_layer_observe_metrics_common as observe;
2929
use prometheus_client::encoding::EncodeLabel;
3030
use prometheus_client::encoding::EncodeLabelSet;
3131
use prometheus_client::encoding::LabelSetEncoder;

src/common/storage/src/operator.rs

Lines changed: 3 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use std::env;
1616
use std::io::Error;
1717
use std::io::ErrorKind;
1818
use std::io::Result;
19-
use std::str::FromStr;
2019
use std::sync::LazyLock;
2120
use std::time::Duration;
2221

@@ -53,13 +52,13 @@ use opendal::layers::AsyncBacktraceLayer;
5352
use opendal::layers::ConcurrentLimitLayer;
5453
use opendal::layers::FastraceLayer;
5554
use opendal::layers::HttpClientLayer;
55+
use opendal::layers::ImmutableIndexLayer;
5656
use opendal::layers::LoggingLayer;
5757
use opendal::layers::RetryInterceptor;
5858
use opendal::layers::RetryLayer;
5959
use opendal::layers::TimeoutLayer;
6060
use opendal::raw::HttpClient;
6161
use opendal::services;
62-
use opendal_layer_immutable_index::ImmutableIndexLayer;
6362

6463
use crate::StorageConfig;
6564
use crate::StorageHttpClient;
@@ -404,8 +403,8 @@ fn init_s3_operator(cfg: &StorageS3Config) -> Result<impl Builder> {
404403
.session_token(&cfg.security_token)
405404
.role_arn(&cfg.role_arn)
406405
.external_id(&cfg.external_id)
407-
// Don't enable it otherwise we will get Permission in stat unknown files
408-
// .allow_anonymous()
406+
// It's safe to allow anonymous since opendal will perform the check first.
407+
.allow_anonymous()
409408
// Root.
410409
.root(&cfg.root);
411410

@@ -666,76 +665,3 @@ impl OperatorRegistry for iceberg::io::FileIO {
666665
Ok((file_io.get_operator().clone(), &location[pos..]))
667666
}
668667
}
669-
670-
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
671-
pub enum Scheme {
672-
Azblob,
673-
Gcs,
674-
Hdfs,
675-
Ipfs,
676-
S3,
677-
Oss,
678-
Obs,
679-
Cos,
680-
Http,
681-
Fs,
682-
Webhdfs,
683-
Huggingface,
684-
Custom(&'static str),
685-
}
686-
687-
impl Scheme {
688-
/// Convert self into static str.
689-
pub fn into_static(self) -> &'static str {
690-
self.into()
691-
}
692-
}
693-
694-
impl From<Scheme> for &'static str {
695-
fn from(v: Scheme) -> Self {
696-
match v {
697-
Scheme::Azblob => "azblob",
698-
Scheme::Gcs => "gcs",
699-
Scheme::Hdfs => "hdfs",
700-
Scheme::Ipfs => "ipfs",
701-
Scheme::S3 => "s3",
702-
Scheme::Oss => "oss",
703-
Scheme::Obs => "obs",
704-
Scheme::Cos => "cos",
705-
Scheme::Http => "http",
706-
Scheme::Fs => "fs",
707-
Scheme::Webhdfs => "webhdfs",
708-
Scheme::Huggingface => "huggingface",
709-
Scheme::Custom(s) => s,
710-
}
711-
}
712-
}
713-
714-
impl FromStr for Scheme {
715-
type Err = Error;
716-
717-
fn from_str(s: &str) -> Result<Self> {
718-
let s = s.to_lowercase();
719-
match s.as_str() {
720-
"azblob" => Ok(Scheme::Azblob),
721-
"gcs" => Ok(Scheme::Gcs),
722-
"hdfs" => Ok(Scheme::Hdfs),
723-
"ipfs" => Ok(Scheme::Ipfs),
724-
"s3" | "s3a" => Ok(Scheme::S3),
725-
"oss" => Ok(Scheme::Oss),
726-
"obs" => Ok(Scheme::Obs),
727-
"cos" => Ok(Scheme::Cos),
728-
"http" | "https" => Ok(Scheme::Http),
729-
"fs" => Ok(Scheme::Fs),
730-
"webhdfs" => Ok(Scheme::Webhdfs),
731-
"huggingface" | "hf" => Ok(Scheme::Huggingface),
732-
_ => Ok(Scheme::Custom(Box::leak(s.into_boxed_str()))),
733-
}
734-
}
735-
}
736-
737-
impl std::fmt::Display for Scheme {
738-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
739-
write!(f, "{}", self.into_static())
740-
}
741-
}

src/common/storage/src/runtime_layer.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -307,18 +307,26 @@ impl<R: oio::List> oio::List for RuntimeIO<R> {
307307
}
308308

309309
impl<R: oio::Delete> oio::Delete for RuntimeIO<R> {
310-
async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
311-
self.inner.as_mut().unwrap().delete(path, args).await
310+
fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
311+
self.inner.as_mut().unwrap().delete(path, args)
312312
}
313313

314-
async fn close(&mut self) -> Result<()> {
314+
async fn flush(&mut self) -> Result<usize> {
315315
let mut r = self.inner.take().expect("deleter must be valid");
316316
let runtime = self.runtime.clone();
317317

318-
let _ = runtime
319-
.spawn(async move { r.close().await })
318+
let (r, res) = runtime
319+
.try_spawn(
320+
async move {
321+
let res = r.flush().await;
322+
(r, res)
323+
},
324+
Some(self.spawn_task_name.clone()),
325+
)
326+
.expect("spawn must success")
320327
.await
321-
.expect("join must success")?;
322-
Ok(())
328+
.expect("join must success");
329+
self.inner = Some(r);
330+
res
323331
}
324332
}

src/common/storage/src/stage.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,7 @@ impl StageFileInfo {
6060
path,
6161
size: meta.content_length(),
6262
md5: meta.content_md5().map(str::to_string),
63-
last_modified: meta.last_modified().map(|m| {
64-
let ns = m.into_inner().as_nanosecond();
65-
DateTime::from_timestamp_nanos(ns as i64)
66-
}),
63+
last_modified: meta.last_modified(),
6764
etag: meta.etag().map(str::to_string),
6865
status: StageFileStatus::NeedCopy,
6966
creator: None,

0 commit comments

Comments
 (0)