diff --git a/Cargo.lock b/Cargo.lock index 1982d08c8e567..939b93a58c91e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -144,6 +144,12 @@ version = "0.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9d4ee0d472d1cd2e28c97dfa124b3d8d992e10eb0a035f33f5d12e3a177ba3b" +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + [[package]] name = "android_system_properties" version = "0.1.5" @@ -214,9 +220,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.100" +version = "1.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" +checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" dependencies = [ "backtrace", ] @@ -1526,9 +1532,9 @@ dependencies = [ [[package]] name = "backon" -version = "1.6.0" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cffb0e931875b666fc4fcb20fee52e9bbd1ef836fd9e9e04ec21555f9f85f7ef" +checksum = "fd0b50b1b78dbadd44ab18b3c794e496f3a139abb9fbc27d9c94c4eebbb96496" dependencies = [ "fastrand", "gloo-timers", @@ -1596,7 +1602,7 @@ version = "0.1.0" dependencies = [ "arrow 56.2.0", "arrow-schema 56.2.0", - "ctor 0.2.9", + "ctor", "databend-common-base", "databend-common-catalog", "databend-common-config", @@ -1683,7 +1689,7 @@ dependencies = [ "bitflags 2.9.0", "cexpr", "clang-sys", - "itertools 0.11.0", + "itertools 0.12.1", "lazy_static", "lazycell", "log", @@ -2305,16 +2311,17 @@ checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" [[package]] name = "chrono" -version = "0.4.42" +version = "0.4.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2" +checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d" dependencies = [ + "android-tzdata", "iana-time-zone", "js-sys", "num-traits", "serde", "wasm-bindgen", - "windows-link 0.2.1", + "windows-link 0.1.1", ] [[package]] @@ -3120,22 +3127,6 @@ dependencies = [ "syn 2.0.106", ] -[[package]] -name = "ctor" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "424e0138278faeb2b401f174ad17e715c829512d74f3d1e81eb43365c2e0590e" -dependencies = [ - "ctor-proc-macro", - "dtor", -] - -[[package]] -name = "ctor-proc-macro" -version = "0.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52560adf09603e58c9a7ee1fe1dcb95a16927b17c127f0ac02d6e768a0e25bc1" - [[package]] name = "ctr" version = "0.9.2" @@ -3738,7 +3729,7 @@ dependencies = [ "bumpalo", "comfy-table", "crc32fast", - "ctor 0.2.9", + "ctor", "databend-common-ast", "databend-common-base", "databend-common-column", @@ -4468,7 +4459,7 @@ dependencies = [ "chrono-tz 0.8.6", "cidr", "cron", - "ctor 0.2.9", + "ctor", "dashmap 6.1.0", "databend-common-ast", "databend-common-base", @@ -4549,8 +4540,6 @@ dependencies = [ "log", "lru", "opendal", - "opendal-layer-immutable-index", - "opendal-layer-observe-metrics-common", "parquet 56.2.0", "prometheus-client 0.22.3", "regex", @@ -5493,7 +5482,7 @@ dependencies = [ "chrono-tz 0.8.6", "concurrent-queue", "cron", - "ctor 0.2.9", + "ctor", "dashmap 6.1.0", "databend-common-ast", "databend-common-base", @@ -6351,21 +6340,6 @@ version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6add3b8cff394282be81f3fc1a0605db594ed69890078ca6e2cab1c408bcf04" -[[package]] -name = "dtor" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "404d02eeb088a82cfd873006cb713fe411306c7d182c344905e101fb1167d301" -dependencies = [ - "dtor-proc-macro", -] - -[[package]] -name = "dtor-proc-macro" -version = "0.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f678cf4a922c215c63e0de95eb1ff08a958a81d47e485cf9da1e27bf6305cfa5" - [[package]] name = "dtparse" version = "2.0.0" @@ -9087,7 +9061,7 @@ dependencies = [ "js-sys", "log", "wasm-bindgen", - "windows-core 0.57.0", + "windows-core 0.61.0", ] [[package]] @@ -9102,7 +9076,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.4.0" -source = "git+https://github.com/databendlabs/iceberg-rust?rev=6536f9c#6536f9ccac1ce56a05ad2e738a2fa6760dea8cb5" +source = "git+https://github.com/databendlabs/iceberg-rust?rev=32b1403#32b1403eef8b00d7f2a526c551aa35b8fc31927e" dependencies = [ "anyhow", "apache-avro", @@ -9151,7 +9125,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-glue" version = "0.4.0" -source = "git+https://github.com/databendlabs/iceberg-rust?rev=6536f9c#6536f9ccac1ce56a05ad2e738a2fa6760dea8cb5" +source = "git+https://github.com/databendlabs/iceberg-rust?rev=32b1403#32b1403eef8b00d7f2a526c551aa35b8fc31927e" dependencies = [ "anyhow", "async-trait", @@ -9168,7 +9142,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-hms" version = "0.4.0" -source = "git+https://github.com/databendlabs/iceberg-rust?rev=6536f9c#6536f9ccac1ce56a05ad2e738a2fa6760dea8cb5" +source = "git+https://github.com/databendlabs/iceberg-rust?rev=32b1403#32b1403eef8b00d7f2a526c551aa35b8fc31927e" dependencies = [ "anyhow", "async-trait", @@ -9192,7 +9166,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-rest" version = "0.4.0" -source = "git+https://github.com/databendlabs/iceberg-rust?rev=6536f9c#6536f9ccac1ce56a05ad2e738a2fa6760dea8cb5" +source = "git+https://github.com/databendlabs/iceberg-rust?rev=32b1403#32b1403eef8b00d7f2a526c551aa35b8fc31927e" dependencies = [ "async-trait", "chrono", @@ -9212,7 +9186,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-s3tables" version = "0.4.0" -source = "git+https://github.com/databendlabs/iceberg-rust?rev=6536f9c#6536f9ccac1ce56a05ad2e738a2fa6760dea8cb5" +source = "git+https://github.com/databendlabs/iceberg-rust?rev=32b1403#32b1403eef8b00d7f2a526c551aa35b8fc31927e" dependencies = [ "anyhow", "async-trait", @@ -9768,12 +9742,10 @@ dependencies = [ "jiff-static", "jiff-tzdb", "jiff-tzdb-platform", - "js-sys", "log", "portable-atomic", "portable-atomic-util", "serde_core", - "wasm-bindgen", "windows-sys 0.61.2", ] @@ -10543,15 +10515,6 @@ dependencies = [ "digest", ] -[[package]] -name = "mea" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1a78f54a189049e2f554d43d2021e3010036ed65a8f5376ab12cc0432d9a341" -dependencies = [ - "slab", -] - [[package]] name = "measure_time" version = "0.9.0" @@ -11306,14 +11269,13 @@ dependencies = [ [[package]] name = "object_store_opendal" -version = "0.55.0" -source = "git+https://github.com/apache/opendal.git?rev=02953ef#02953ef90c475eb592596f1a0b68370188a80128" +version = "0.54.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0b88fc0e0c4890c1d99e2b8c519c5db40f7d9b69a0f562ff1ad4967a4c8bbc6" dependencies = [ "async-trait", "bytes", - "chrono", "futures", - "mea", "object_store", "opendal", "pin-project", @@ -11344,244 +11306,37 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" [[package]] name = "opendal" -version = "0.55.0" -source = "git+https://github.com/apache/opendal.git?rev=02953ef#02953ef90c475eb592596f1a0b68370188a80128" -dependencies = [ - "opendal-core", - "opendal-layer-async-backtrace", - "opendal-layer-fastrace", - "opendal-layer-prometheus-client", - "opendal-service-azblob", - "opendal-service-azdls", - "opendal-service-gcs", - "opendal-service-ipfs", - "opendal-service-moka", - "opendal-service-obs", - "opendal-service-oss", - "opendal-service-s3", -] - -[[package]] -name = "opendal-core" -version = "0.55.0" -source = "git+https://github.com/apache/opendal.git?rev=02953ef#02953ef90c475eb592596f1a0b68370188a80128" +version = "0.54.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42afda58fa2cf50914402d132cc1caacff116a85d10c72ab2082bb7c50021754" dependencies = [ "anyhow", + "async-backtrace", "backon", "base64 0.22.1", "bytes", - "ctor 0.6.3", + "chrono", + "crc32c", + "fastrace", "futures", "getrandom 0.2.16", "hdrs", "http 1.3.1", "http-body 1.0.1", - "jiff", "log", "md-5", - "mea", + "moka", "percent-encoding", + "prometheus-client 0.23.1", + "prost", "quick-xml 0.38.4", "reqsign", "reqwest", "serde", "serde_json", - "tokio", - "url", - "uuid", - "web-time", -] - -[[package]] -name = "opendal-layer-async-backtrace" -version = "0.55.0" -source = "git+https://github.com/apache/opendal.git?rev=02953ef#02953ef90c475eb592596f1a0b68370188a80128" -dependencies = [ - "async-backtrace", - "opendal-core", -] - -[[package]] -name = "opendal-layer-fastrace" -version = "0.55.0" -source = "git+https://github.com/apache/opendal.git?rev=02953ef#02953ef90c475eb592596f1a0b68370188a80128" -dependencies = [ - "fastrace", - "opendal-core", -] - -[[package]] -name = "opendal-layer-immutable-index" -version = "0.55.0" -source = "git+https://github.com/apache/opendal.git?rev=02953ef#02953ef90c475eb592596f1a0b68370188a80128" -dependencies = [ - "opendal-core", -] - -[[package]] -name = "opendal-layer-observe-metrics-common" -version = "0.55.0" -source = "git+https://github.com/apache/opendal.git?rev=02953ef#02953ef90c475eb592596f1a0b68370188a80128" -dependencies = [ - "futures", - "http 1.3.1", - "opendal-core", -] - -[[package]] -name = "opendal-layer-prometheus-client" -version = "0.55.0" -source = "git+https://github.com/apache/opendal.git?rev=02953ef#02953ef90c475eb592596f1a0b68370188a80128" -dependencies = [ - "opendal-core", - "opendal-layer-observe-metrics-common", - "prometheus-client 0.24.0", -] - -[[package]] -name = "opendal-service-azblob" -version = "0.55.0" -source = "git+https://github.com/apache/opendal.git?rev=02953ef#02953ef90c475eb592596f1a0b68370188a80128" -dependencies = [ - "base64 0.22.1", - "bytes", - "ctor 0.6.3", - "http 1.3.1", - "log", - "opendal-core", - "opendal-service-azure-common", - "quick-xml 0.38.4", - "reqsign", - "serde", "sha2", - "uuid", -] - -[[package]] -name = "opendal-service-azdls" -version = "0.55.0" -source = "git+https://github.com/apache/opendal.git?rev=02953ef#02953ef90c475eb592596f1a0b68370188a80128" -dependencies = [ - "bytes", - "ctor 0.6.3", - "http 1.3.1", - "log", - "opendal-core", - "opendal-service-azure-common", - "quick-xml 0.38.4", - "reqsign", - "serde", - "serde_json", -] - -[[package]] -name = "opendal-service-azure-common" -version = "0.55.0" -source = "git+https://github.com/apache/opendal.git?rev=02953ef#02953ef90c475eb592596f1a0b68370188a80128" -dependencies = [ - "http 1.3.1", - "opendal-core", - "reqsign", -] - -[[package]] -name = "opendal-service-gcs" -version = "0.55.0" -source = "git+https://github.com/apache/opendal.git?rev=02953ef#02953ef90c475eb592596f1a0b68370188a80128" -dependencies = [ - "backon", - "base64 0.22.1", - "bytes", - "ctor 0.6.3", - "http 1.3.1", - "log", - "opendal-core", - "percent-encoding", - "quick-xml 0.38.4", - "reqsign", - "reqwest", - "serde", - "serde_json", "tokio", -] - -[[package]] -name = "opendal-service-ipfs" -version = "0.55.0" -source = "git+https://github.com/apache/opendal.git?rev=02953ef#02953ef90c475eb592596f1a0b68370188a80128" -dependencies = [ - "bytes", - "ctor 0.6.3", - "http 1.3.1", - "log", - "opendal-core", - "prost", - "serde", -] - -[[package]] -name = "opendal-service-moka" -version = "0.55.0" -source = "git+https://github.com/apache/opendal.git?rev=02953ef#02953ef90c475eb592596f1a0b68370188a80128" -dependencies = [ - "ctor 0.6.3", - "log", - "moka", - "opendal-core", - "serde", -] - -[[package]] -name = "opendal-service-obs" -version = "0.55.0" -source = "git+https://github.com/apache/opendal.git?rev=02953ef#02953ef90c475eb592596f1a0b68370188a80128" -dependencies = [ - "bytes", - "ctor 0.6.3", - "http 1.3.1", - "log", - "opendal-core", - "quick-xml 0.38.4", - "reqsign", - "serde", -] - -[[package]] -name = "opendal-service-oss" -version = "0.55.0" -source = "git+https://github.com/apache/opendal.git?rev=02953ef#02953ef90c475eb592596f1a0b68370188a80128" -dependencies = [ - "bytes", - "ctor 0.6.3", - "http 1.3.1", - "log", - "opendal-core", - "quick-xml 0.38.4", - "reqsign", - "reqwest", - "serde", -] - -[[package]] -name = "opendal-service-s3" -version = "0.55.0" -source = "git+https://github.com/apache/opendal.git?rev=02953ef#02953ef90c475eb592596f1a0b68370188a80128" -dependencies = [ - "base64 0.22.1", - "bytes", - "crc32c", - "ctor 0.6.3", - "http 1.3.1", - "log", - "md-5", - "opendal-core", - "quick-xml 0.38.4", - "reqsign-aws-v4", - "reqsign-core", - "reqsign-file-read-tokio", - "reqsign-http-send-reqwest", - "reqwest", - "serde", + "uuid", ] [[package]] @@ -12763,19 +12518,19 @@ dependencies = [ "dtoa", "itoa", "parking_lot 0.12.3", - "prometheus-client-derive-encode 0.4.2", + "prometheus-client-derive-encode", ] [[package]] name = "prometheus-client" -version = "0.24.0" +version = "0.23.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4500adecd7af8e0e9f4dbce15cfee07ce913fbf6ad605cc468b83f2d531ee94" +checksum = "cf41c1a7c32ed72abe5082fb19505b969095c12da9f5732a4bc9878757fd087c" dependencies = [ "dtoa", "itoa", "parking_lot 0.12.3", - "prometheus-client-derive-encode 0.5.0", + "prometheus-client-derive-encode", ] [[package]] @@ -12789,17 +12544,6 @@ dependencies = [ "syn 2.0.106", ] -[[package]] -name = "prometheus-client-derive-encode" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9adf1691c04c0a5ff46ff8f262b58beb07b0dbb61f96f9f54f6cbd82106ed87f" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.106", -] - [[package]] name = "prometheus-parse" version = "0.2.5" @@ -12845,7 +12589,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ "heck 0.5.0", - "itertools 0.11.0", + "itertools 0.14.0", "log", "multimap", "once_cell", @@ -12865,7 +12609,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.11.0", + "itertools 0.14.0", "proc-macro2", "quote", "syn 2.0.106", @@ -13137,9 +12881,9 @@ dependencies = [ [[package]] name = "python3-dll-a" -version = "0.2.14" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d381ef313ae70b4da5f95f8a4de773c6aa5cd28f73adec4b4a31df70b66780d8" +checksum = "49fe4227a288cf9493942ad0220ea3f185f4d1f2a14f197f7344d6d02f4ed4ed" dependencies = [ "cc", ] @@ -13159,6 +12903,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "quick-xml" +version = "0.37.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "331e97a1af0bf59823e6eadffe373d7b27f485be8748f71471c662c1f269b7fb" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quick-xml" version = "0.38.4" @@ -13637,88 +13391,18 @@ dependencies = [ "log", "once_cell", "percent-encoding", + "quick-xml 0.37.5", "rand 0.8.5", "reqwest", "rsa", - "serde", - "serde_json", - "sha1", - "sha2", -] - -[[package]] -name = "reqsign-aws-v4" -version = "2.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4510c2a3e42b653cf788d560a3d54b0ae4cc315a62aaba773554f18319c0db0b" -dependencies = [ - "anyhow", - "async-trait", - "bytes", - "form_urlencoded", - "http 1.3.1", - "log", - "percent-encoding", - "quick-xml 0.38.4", - "reqsign-core", "rust-ini", "serde", "serde_json", - "serde_urlencoded", - "sha1", -] - -[[package]] -name = "reqsign-core" -version = "2.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39da118ccf3bdb067ac6cc40136fec99bc5ba418cbd388dc88e4ce0e5d0b1423" -dependencies = [ - "anyhow", - "async-trait", - "base64 0.22.1", - "bytes", - "form_urlencoded", - "hex", - "hmac", - "http 1.3.1", - "jiff", - "log", - "percent-encoding", "sha1", "sha2", - "windows-sys 0.61.2", -] - -[[package]] -name = "reqsign-file-read-tokio" -version = "2.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "669ea66036266a9ac371d2e63cc7d345e69994da0168b4e6f3487fe21e126f76" -dependencies = [ - "anyhow", - "async-trait", - "reqsign-core", "tokio", ] -[[package]] -name = "reqsign-http-send-reqwest" -version = "2.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46186bce769674f9200ad01af6f2ca42de3e819ddc002fff1edae135bfb6cd9c" -dependencies = [ - "anyhow", - "async-trait", - "bytes", - "futures-channel", - "http 1.3.1", - "http-body-util", - "reqsign-core", - "reqwest", - "wasm-bindgen-futures", -] - [[package]] name = "reqwest" version = "0.12.24" @@ -14879,9 +14563,12 @@ dependencies = [ [[package]] name = "slab" -version = "0.4.11" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] [[package]] name = "sled" @@ -17820,7 +17507,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index e16e8641180a5..d92dd1f9bed60 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -312,13 +312,13 @@ hyper-util = { version = "0.1.9", features = ["client", "client-legacy", "tokio" lru = "0.12" ## in branch dev -iceberg = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "6536f9c", features = [ +iceberg = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "32b1403", features = [ "storage-all", ] } -iceberg-catalog-glue = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "6536f9c" } -iceberg-catalog-hms = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "6536f9c" } -iceberg-catalog-rest = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "6536f9c" } -iceberg-catalog-s3tables = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "6536f9c" } +iceberg-catalog-glue = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "32b1403" } +iceberg-catalog-hms = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "32b1403" } +iceberg-catalog-rest = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "32b1403" } +iceberg-catalog-s3tables = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "32b1403" } # Explicitly specify compatible AWS SDK versions aws-config = "1.5.18" @@ -367,9 +367,9 @@ num-derive = "0.4.2" num-traits = "0.2.19" num_cpus = "1.17" object = "0.36.5" -object_store_opendal = { git = "https://github.com/apache/opendal.git", rev = "02953ef" } +object_store_opendal = { version = "0.54.1" } once_cell = "1.15.0" -opendal = { git = "https://github.com/apache/opendal.git", rev = "02953ef", features = [ +opendal = { version = "0.54.1", features = [ "layers-fastrace", "layers-prometheus-client", "layers-async-backtrace", @@ -387,8 +387,6 @@ opendal = { git = "https://github.com/apache/opendal.git", rev = "02953ef", feat "services-webhdfs", "services-huggingface", ] } -opendal-layer-immutable-index = { git = "https://github.com/apache/opendal.git", rev = "02953ef" } -opendal-layer-observe-metrics-common = { git = "https://github.com/apache/opendal.git", rev = "02953ef" } openraft = { version = "0.10.0", features = [ "serde", "tracing-log", diff --git a/src/bendsave/src/storage.rs b/src/bendsave/src/storage.rs index d9affd2871989..059aa1e4d60fe 100644 --- a/src/bendsave/src/storage.rs +++ b/src/bendsave/src/storage.rs @@ -219,7 +219,7 @@ mod tests { use std::path::Path; use databend_common_base::base::tokio; - use databend_common_storage::Scheme; + use opendal::Scheme; use super::*; @@ -237,12 +237,12 @@ mod tests { #[tokio::test] async fn test_load_epochfs_storage() -> Result<()> { let op = load_bendsave_storage("s3://bendsave/tmp?region=us-east-1").await?; - assert_eq!(op.info().scheme(), Scheme::S3.to_string()); + assert_eq!(op.info().scheme(), Scheme::S3); assert_eq!(op.info().name(), "bendsave"); assert_eq!(op.info().root(), "/tmp/"); let op = load_bendsave_storage("fs://opt").await?; - assert_eq!(op.info().scheme(), Scheme::Fs.to_string()); + assert_eq!(op.info().scheme(), Scheme::Fs); assert_eq!(op.info().root(), "/opt"); Ok(()) } diff --git a/src/common/exception/src/exception_into.rs b/src/common/exception/src/exception_into.rs index 4945fb7786a2e..daf06e7a9ecb1 100644 --- a/src/common/exception/src/exception_into.rs +++ b/src/common/exception/src/exception_into.rs @@ -183,19 +183,12 @@ impl From for ErrorCode { impl From for ErrorCode { fn from(error: opendal::Error) -> Self { - let msg = error.message(); - let detail = error.to_string(); - let detail = detail - .strip_suffix(msg) - .and_then(|err| err.strip_suffix(" => ")) - .unwrap_or(&detail); - match error.kind() { - opendal::ErrorKind::NotFound => ErrorCode::StorageNotFound(msg).add_detail(detail), + opendal::ErrorKind::NotFound => ErrorCode::StorageNotFound(error.to_string()), opendal::ErrorKind::PermissionDenied => { - ErrorCode::StoragePermissionDenied(msg).add_detail(detail) + ErrorCode::StoragePermissionDenied(error.to_string()) } - _ => ErrorCode::StorageOther(msg).add_detail(detail), + _ => ErrorCode::StorageOther(format!("{error:?}")), } } } diff --git a/src/common/storage/Cargo.toml b/src/common/storage/Cargo.toml index 8f4a9854e01d3..4f06485f6796b 100644 --- a/src/common/storage/Cargo.toml +++ b/src/common/storage/Cargo.toml @@ -34,8 +34,6 @@ iceberg = { workspace = true } log = { workspace = true } lru = { workspace = true } opendal = { workspace = true } -opendal-layer-immutable-index = { workspace = true } -opendal-layer-observe-metrics-common = { workspace = true } parquet = { workspace = true } prometheus-client = { workspace = true } regex = { workspace = true } diff --git a/src/common/storage/src/lib.rs b/src/common/storage/src/lib.rs index d04d0dd66e9db..04fcd82ca6647 100644 --- a/src/common/storage/src/lib.rs +++ b/src/common/storage/src/lib.rs @@ -43,7 +43,6 @@ pub use http_client::StorageHttpClient; mod operator; pub use operator::DataOperator; pub use operator::OperatorRegistry; -pub use operator::Scheme; pub use operator::check_operator; pub use operator::init_operator; diff --git a/src/common/storage/src/metrics_layer.rs b/src/common/storage/src/metrics_layer.rs index 78d4e3831583a..1719bea9beb4a 100644 --- a/src/common/storage/src/metrics_layer.rs +++ b/src/common/storage/src/metrics_layer.rs @@ -23,9 +23,9 @@ use databend_common_base::runtime::metrics::FamilyHistogram; use databend_common_base::runtime::metrics::register_counter_family; use databend_common_base::runtime::metrics::register_gauge_family; use databend_common_base::runtime::metrics::register_histogram_family; +use opendal::layers::observe; use opendal::raw::Access; use opendal::raw::Layer; -use opendal_layer_observe_metrics_common as observe; use prometheus_client::encoding::EncodeLabel; use prometheus_client::encoding::EncodeLabelSet; use prometheus_client::encoding::LabelSetEncoder; diff --git a/src/common/storage/src/operator.rs b/src/common/storage/src/operator.rs index e7e8adcd3961b..301eaabd06c46 100644 --- a/src/common/storage/src/operator.rs +++ b/src/common/storage/src/operator.rs @@ -16,7 +16,6 @@ use std::env; use std::io::Error; use std::io::ErrorKind; use std::io::Result; -use std::str::FromStr; use std::sync::LazyLock; use std::time::Duration; @@ -53,13 +52,13 @@ use opendal::layers::AsyncBacktraceLayer; use opendal::layers::ConcurrentLimitLayer; use opendal::layers::FastraceLayer; use opendal::layers::HttpClientLayer; +use opendal::layers::ImmutableIndexLayer; use opendal::layers::LoggingLayer; use opendal::layers::RetryInterceptor; use opendal::layers::RetryLayer; use opendal::layers::TimeoutLayer; use opendal::raw::HttpClient; use opendal::services; -use opendal_layer_immutable_index::ImmutableIndexLayer; use crate::StorageConfig; use crate::StorageHttpClient; @@ -404,8 +403,8 @@ fn init_s3_operator(cfg: &StorageS3Config) -> Result { .session_token(&cfg.security_token) .role_arn(&cfg.role_arn) .external_id(&cfg.external_id) - // Don't enable it otherwise we will get Permission in stat unknown files - // .allow_anonymous() + // It's safe to allow anonymous since opendal will perform the check first. + .allow_anonymous() // Root. .root(&cfg.root); @@ -666,76 +665,3 @@ impl OperatorRegistry for iceberg::io::FileIO { Ok((file_io.get_operator().clone(), &location[pos..])) } } - -#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] -pub enum Scheme { - Azblob, - Gcs, - Hdfs, - Ipfs, - S3, - Oss, - Obs, - Cos, - Http, - Fs, - Webhdfs, - Huggingface, - Custom(&'static str), -} - -impl Scheme { - /// Convert self into static str. - pub fn into_static(self) -> &'static str { - self.into() - } -} - -impl From for &'static str { - fn from(v: Scheme) -> Self { - match v { - Scheme::Azblob => "azblob", - Scheme::Gcs => "gcs", - Scheme::Hdfs => "hdfs", - Scheme::Ipfs => "ipfs", - Scheme::S3 => "s3", - Scheme::Oss => "oss", - Scheme::Obs => "obs", - Scheme::Cos => "cos", - Scheme::Http => "http", - Scheme::Fs => "fs", - Scheme::Webhdfs => "webhdfs", - Scheme::Huggingface => "huggingface", - Scheme::Custom(s) => s, - } - } -} - -impl FromStr for Scheme { - type Err = Error; - - fn from_str(s: &str) -> Result { - let s = s.to_lowercase(); - match s.as_str() { - "azblob" => Ok(Scheme::Azblob), - "gcs" => Ok(Scheme::Gcs), - "hdfs" => Ok(Scheme::Hdfs), - "ipfs" => Ok(Scheme::Ipfs), - "s3" | "s3a" => Ok(Scheme::S3), - "oss" => Ok(Scheme::Oss), - "obs" => Ok(Scheme::Obs), - "cos" => Ok(Scheme::Cos), - "http" | "https" => Ok(Scheme::Http), - "fs" => Ok(Scheme::Fs), - "webhdfs" => Ok(Scheme::Webhdfs), - "huggingface" | "hf" => Ok(Scheme::Huggingface), - _ => Ok(Scheme::Custom(Box::leak(s.into_boxed_str()))), - } - } -} - -impl std::fmt::Display for Scheme { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.into_static()) - } -} diff --git a/src/common/storage/src/runtime_layer.rs b/src/common/storage/src/runtime_layer.rs index 64b4b025a074d..578006e41a768 100644 --- a/src/common/storage/src/runtime_layer.rs +++ b/src/common/storage/src/runtime_layer.rs @@ -307,18 +307,26 @@ impl oio::List for RuntimeIO { } impl oio::Delete for RuntimeIO { - async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { - self.inner.as_mut().unwrap().delete(path, args).await + fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { + self.inner.as_mut().unwrap().delete(path, args) } - async fn close(&mut self) -> Result<()> { + async fn flush(&mut self) -> Result { let mut r = self.inner.take().expect("deleter must be valid"); let runtime = self.runtime.clone(); - let _ = runtime - .spawn(async move { r.close().await }) + let (r, res) = runtime + .try_spawn( + async move { + let res = r.flush().await; + (r, res) + }, + Some(self.spawn_task_name.clone()), + ) + .expect("spawn must success") .await - .expect("join must success")?; - Ok(()) + .expect("join must success"); + self.inner = Some(r); + res } } diff --git a/src/common/storage/src/stage.rs b/src/common/storage/src/stage.rs index 5a9c1137fb004..02a1a3e29e323 100644 --- a/src/common/storage/src/stage.rs +++ b/src/common/storage/src/stage.rs @@ -60,10 +60,7 @@ impl StageFileInfo { path, size: meta.content_length(), md5: meta.content_md5().map(str::to_string), - last_modified: meta.last_modified().map(|m| { - let ns = m.into_inner().as_nanosecond(); - DateTime::from_timestamp_nanos(ns as i64) - }), + last_modified: meta.last_modified(), etag: meta.etag().map(str::to_string), status: StageFileStatus::NeedCopy, creator: None, diff --git a/src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs b/src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs index c3c659c720724..510db7c43161f 100644 --- a/src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs +++ b/src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs @@ -48,6 +48,7 @@ use log::info; use opendal::Entry; use opendal::ErrorKind; use opendal::Operator; +use opendal::Scheme; use uuid::Version; /// An assumption of the maximum duration from the time the first block is written to the time the @@ -485,7 +486,7 @@ async fn list_until_prefix( let dal = fuse_table.get_operator_ref(); match dal.info().scheme() { - "fs" => fs_list_until_prefix(dal, path, until, need_one_more, gc_root_meta_ts).await, + Scheme::Fs => fs_list_until_prefix(dal, path, until, need_one_more, gc_root_meta_ts).await, _ => general_list_until_prefix(dal, path, until, need_one_more, gc_root_meta_ts).await, } } @@ -586,9 +587,6 @@ async fn is_gc_candidate_segment_block( })? }; - let last_modified = - DateTime::from_timestamp_nanos(last_modified.into_inner().as_nanosecond() as i64); - Ok(last_modified + ASSUMPTION_MAX_TXN_DURATION < gc_root_meta_ts) } @@ -676,15 +674,12 @@ async fn select_gc_root( let gc_root = read_snapshot_from_location(fuse_table, &gc_root_path).await; let gc_root_meta_ts = match dal.stat(&gc_root_path).await { - Ok(v) => v - .last_modified() - .ok_or_else(|| { - ErrorCode::StorageOther(format!( - "Failed to get `last_modified` metadata of the gc root object '{}'", - gc_root_path - )) - }) - .map(|v| DateTime::from_timestamp_nanos(v.into_inner().as_nanosecond() as i64))?, + Ok(v) => v.last_modified().ok_or_else(|| { + ErrorCode::StorageOther(format!( + "Failed to get `last_modified` metadata of the gc root object '{}'", + gc_root_path + )) + })?, Err(e) => { return if e.kind() == ErrorKind::NotFound { // Concurrent vacuum, ignore it @@ -716,13 +711,8 @@ async fn select_gc_root( gc_root_path )) })?, - Some(v) => v + Some(v) => v, }; - - let last_modified = DateTime::from_timestamp_nanos( - last_modified.into_inner().as_nanosecond() as i64, - ); - if last_modified + ASSUMPTION_MAX_TXN_DURATION < gc_root_meta_ts { gc_candidates.push(path.to_owned()); } diff --git a/src/query/ee/src/storages/fuse/operations/vacuum_temporary_files.rs b/src/query/ee/src/storages/fuse/operations/vacuum_temporary_files.rs index b595a2bb2c326..7458db51ee331 100644 --- a/src/query/ee/src/storages/fuse/operations/vacuum_temporary_files.rs +++ b/src/query/ee/src/storages/fuse/operations/vacuum_temporary_files.rs @@ -105,7 +105,7 @@ async fn vacuum_by_duration( let meta = meta.unwrap(); if let Some(modified) = meta.last_modified() { - if timestamp - modified.into_inner().as_millisecond() < expire_time { + if timestamp - modified.timestamp_millis() < expire_time { continue; } } diff --git a/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs b/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs index e6b54943945ff..3f861ef3b3bab 100644 --- a/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs +++ b/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![allow(clippy::let_and_return)] use std::fmt::Debug; use std::sync::Arc; use std::time::Duration; @@ -298,15 +297,17 @@ mod test_accessor { } impl oio::Delete for MockDeleter { - async fn delete(&mut self, _path: &str, _args: OpDelete) -> opendal::Result<()> { + fn delete(&mut self, _path: &str, _args: OpDelete) -> opendal::Result<()> { self.size += 1; Ok(()) } - async fn close(&mut self) -> opendal::Result<()> { + async fn flush(&mut self) -> opendal::Result { self.hit_batch.store(true, Ordering::Release); + + let n = self.size; self.size = 0; - Ok(()) + Ok(n) } } @@ -900,10 +901,7 @@ async fn test_vacuum_drop_create_or_replace_impl(vacuum_stmts: &[&str]) -> Resul async fn new_local_meta() -> MetaStore { let version = &BUILD_INFO; let meta_config = MetaConfig::default(); - let meta = { - let config = meta_config.to_meta_grpc_client_conf(version); - let provider = Arc::new(MetaStoreProvider::new(config)); - provider.create_meta_store().await.unwrap() - }; - meta + let config = meta_config.to_meta_grpc_client_conf(version); + let provider = MetaStoreProvider::new(config); + provider.create_meta_store().await.unwrap() } diff --git a/src/query/service/src/history_tables/external.rs b/src/query/service/src/history_tables/external.rs index 8e1014a98b8c6..ca7e6b3a5e341 100644 --- a/src/query/service/src/history_tables/external.rs +++ b/src/query/service/src/history_tables/external.rs @@ -15,7 +15,7 @@ use std::collections::BTreeMap; use databend_common_meta_app::storage::StorageParams; -use databend_common_storage::Scheme; +use opendal::Scheme; use opendal::raw::normalize_root; #[derive(Debug)] diff --git a/src/query/service/src/table_functions/infer_schema/infer_schema_table.rs b/src/query/service/src/table_functions/infer_schema/infer_schema_table.rs index 1c9113707d35d..6fc71ed831bab 100644 --- a/src/query/service/src/table_functions/infer_schema/infer_schema_table.rs +++ b/src/query/service/src/table_functions/infer_schema/infer_schema_table.rs @@ -46,7 +46,6 @@ use databend_common_pipeline::core::Pipeline; use databend_common_pipeline::sources::PrefetchAsyncSourcer; use databend_common_pipeline_transforms::TransformPipelineHelper; use databend_common_sql::binder::resolve_file_location; -use databend_common_storage::Scheme; use databend_common_storage::StageFilesInfo; use databend_common_storage::init_stage_operator; use databend_common_storages_stage::BytesReader; @@ -55,6 +54,7 @@ use databend_common_storages_stage::InferSchemaPartInfo; use databend_common_storages_stage::LoadContext; use databend_common_users::Object; use databend_storages_common_stage::SingleFilePartition; +use opendal::Scheme; use super::parquet::ParquetInferSchemaSource; use crate::sessions::TableContext; diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs index a150932d32003..9be90572d0107 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![allow(clippy::manual_is_multiple_of)] - use std::collections::HashSet; use std::sync::Arc; @@ -204,7 +202,7 @@ async fn test_safety() -> Result<()> { number_of_segments, number_of_blocks, ); - let cluster_key_id = if number_of_segments % 2 == 0 { + let cluster_key_id = if number_of_segments.is_multiple_of(2) { Some(0) } else { None diff --git a/src/query/service/tests/it/storages/fuse/operations/navigate.rs b/src/query/service/tests/it/storages/fuse/operations/navigate.rs index 5e7f7c7877e32..0f68ccf085549 100644 --- a/src/query/service/tests/it/storages/fuse/operations/navigate.rs +++ b/src/query/service/tests/it/storages/fuse/operations/navigate.rs @@ -219,7 +219,7 @@ async fn test_navigate_for_purge() -> Result<()> { let meta = fuse_table.get_operator().stat(&loc).await?; let modified = meta.last_modified(); assert!(modified.is_some()); - let millis = modified.unwrap().into_inner().as_millisecond(); + let millis = modified.unwrap().timestamp_millis(); let seconds = millis / 1000; let nanos = ((millis % 1000) * 1_000_000) as u32; let base_time = chrono::DateTime::::from_timestamp(seconds as i64, nanos) diff --git a/src/query/sql/src/planner/binder/location.rs b/src/query/sql/src/planner/binder/location.rs index 13b82aaaf04d4..d36d81a2158a9 100644 --- a/src/query/sql/src/planner/binder/location.rs +++ b/src/query/sql/src/planner/binder/location.rs @@ -44,9 +44,9 @@ use databend_common_meta_app::storage::StorageParams; use databend_common_meta_app::storage::StorageS3Config; use databend_common_meta_app::storage::StorageWebhdfsConfig; use databend_common_storage::STDIN_FD; -use databend_common_storage::Scheme; use log::LevelFilter; use log::info; +use opendal::Scheme; use opendal::raw::normalize_path; use opendal::raw::normalize_root; diff --git a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs index b87f4b70f07eb..5b19b543bc328 100644 --- a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs @@ -222,9 +222,9 @@ where F: SnapshotGenerator + Send + Sync + 'static snapshot_gen .as_any() .downcast_ref::() - .is_some_and(|g| { + .is_some_and(|generator| { matches!( - g.mutation_kind, + generator.mutation_kind, MutationKind::Update | MutationKind::Delete | MutationKind::MergeInto @@ -315,14 +315,14 @@ where F: SnapshotGenerator + Send + Sync + 'static snapshot_gen .as_any() .downcast_ref::() - .is_some_and(|g| matches!(g.mode(), TruncateMode::DropAll)) + .is_some_and(|generator| matches!(generator.mode(), TruncateMode::DropAll)) } fn need_truncate(&self) -> bool { self.snapshot_gen .as_any() .downcast_ref::() - .is_some_and(|g| !matches!(g.mode(), TruncateMode::Delete)) + .is_some_and(|generator| !matches!(generator.mode(), TruncateMode::Delete)) } fn is_append_only_txn(&self) -> bool { diff --git a/src/query/storages/fuse/src/operations/navigate.rs b/src/query/storages/fuse/src/operations/navigate.rs index 20b973b4d3357..e6ae0e3d1f3bb 100644 --- a/src/query/storages/fuse/src/operations/navigate.rs +++ b/src/query/storages/fuse/src/operations/navigate.rs @@ -437,13 +437,11 @@ impl FuseTable { let meta = op.stat(de.path()).await?; meta.last_modified() }; + let location = de.path().to_string(); if let Some(modified) = modified { - let utc_modified = DateTime::from_timestamp_nanos( - modified.into_inner().as_nanosecond() as i64, - ); - if f(location.clone(), utc_modified) { - file_list.push((location, utc_modified)); + if f(location.clone(), modified) { + file_list.push((location, modified)); } } } diff --git a/src/query/storages/system/src/temp_files_table.rs b/src/query/storages/system/src/temp_files_table.rs index ff6f6f3726394..c04be12740f0c 100644 --- a/src/query/storages/system/src/temp_files_table.rs +++ b/src/query/storages/system/src/temp_files_table.rs @@ -52,7 +52,7 @@ use futures::stream::Take; use opendal::Lister; use opendal::Metadata; use opendal::Operator; -use opendal::options::ListOptions; +use opendal::operator_futures::FutureLister; use crate::table::SystemTablePart; @@ -152,35 +152,22 @@ impl TempFilesTable { let limit = push_downs.as_ref().and_then(|x| x.limit); let operator = DataOperator::instance().spill_operator(); - let lister = { - let op = operator.clone(); - let path = location_prefix.clone(); - async move { - op.lister_options(&path, ListOptions { - recursive: true, - ..Default::default() - }) - .await - } - }; + let lister = operator.lister_with(&location_prefix).recursive(true); let stream = { let prefix = location_prefix.clone(); let mut counter = 0; let ctx = ctx.clone(); - - stream_source_from_entry_lister_with_chunk_size( - operator.clone(), - lister, - limit, - MAX_BATCH_SIZE, - move |entries| { + let builder = ListerStreamSourceBuilder::with_lister_fut(operator, lister); + builder + .limit_opt(limit) + .chunk_size(MAX_BATCH_SIZE) + .build(move |entries| { counter += entries.len(); let block = Self::block_from_entries(&prefix, entries)?; ctx.set_status_info(format!("{} entries processed", counter).as_str()); Ok(block) - }, - )? + })? }; StreamSource::create(ctx.get_scan_progress(), Some(stream), output) @@ -215,11 +202,8 @@ impl TempFilesTable { if metadata.is_file() { temp_files_name.push(path.trim_start_matches(location_prefix).to_string()); - temp_files_last_modified.push( - metadata - .last_modified() - .map(|x| x.into_inner().as_microsecond()), - ); + temp_files_last_modified + .push(metadata.last_modified().map(|x| x.timestamp_micros())); temp_files_content_length.push(metadata.content_length()); } } @@ -235,9 +219,54 @@ impl TempFilesTable { const MAX_BATCH_SIZE: usize = 1000; +pub struct ListerStreamSourceBuilder +where T: Future> + Send + 'static +{ + op: Operator, + lister_fut: FutureLister, + limit: Option, + chunk_size: usize, +} + +impl ListerStreamSourceBuilder +where T: Future> + Send + 'static +{ + pub fn with_lister_fut(op: Operator, lister_fut: FutureLister) -> Self { + Self { + op, + lister_fut, + limit: None, + chunk_size: MAX_BATCH_SIZE, + } + } + + pub fn limit_opt(mut self, limit: Option) -> Self { + self.limit = limit; + self + } + + pub fn chunk_size(mut self, chunk_size: usize) -> Self { + self.chunk_size = chunk_size; + self + } + + pub fn build( + self, + block_builder: impl FnMut(Vec<(String, Metadata)>) -> Result + Sync + Send + 'static, + ) -> Result { + stream_source_from_entry_lister_with_chunk_size( + self.op.clone(), + self.lister_fut, + self.limit, + self.chunk_size, + block_builder, + ) + } +} + fn stream_source_from_entry_lister_with_chunk_size( op: Operator, - lister_fut: T, + lister_fut: FutureLister, limit: Option, chunk_size: usize, block_builder: impl FnMut(Vec<(String, Metadata)>) -> Result + Sync + Send + 'static, @@ -246,7 +275,7 @@ where T: Future> + Send + 'static, { enum ListerState> + Send + 'static> { - Uninitialized(U), + Uninitialized(FutureLister), Initialized(Chunks>), }