diff --git a/Cargo.lock b/Cargo.lock index 89c1ab4d034ac..a3c51078ff170 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4263,7 +4263,7 @@ dependencies = [ "iceberg-catalog-hms", "iceberg-catalog-rest", "serde", - "tokio", + "serde_json", "typetag", ] @@ -8315,7 +8315,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.3.0" -source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=fe5df3f#fe5df3fc432f6f3c0e235bca710a3c7db0c5f369" +source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=01d706a1#01d706a1b01a3f93cbc9f4a43b3030b963104e47" dependencies = [ "anyhow", "apache-avro", @@ -8357,12 +8357,13 @@ dependencies = [ "typed-builder 0.20.0", "url", "uuid", + "zstd 0.13.2", ] [[package]] name = "iceberg-catalog-glue" version = "0.3.0" -source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=fe5df3f#fe5df3fc432f6f3c0e235bca710a3c7db0c5f369" +source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=01d706a1#01d706a1b01a3f93cbc9f4a43b3030b963104e47" dependencies = [ "anyhow", "async-trait", @@ -8379,7 +8380,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-hms" version = "0.3.0" -source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=fe5df3f#fe5df3fc432f6f3c0e235bca710a3c7db0c5f369" +source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=01d706a1#01d706a1b01a3f93cbc9f4a43b3030b963104e47" dependencies = [ "anyhow", "async-trait", @@ -8398,7 +8399,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-rest" version = "0.3.0" -source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=fe5df3f#fe5df3fc432f6f3c0e235bca710a3c7db0c5f369" +source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=01d706a1#01d706a1b01a3f93cbc9f4a43b3030b963104e47" dependencies = [ "async-trait", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 378b7d40d550c..7cd1a9463d57d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -307,10 +307,10 @@ http = "1" humantime = "2.1.0" hyper = "1" hyper-util = { version = "0.1.9", features = ["client", "client-legacy", "tokio", "service"] } -iceberg = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "fe5df3f" } -iceberg-catalog-glue = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "fe5df3f" } -iceberg-catalog-hms = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "fe5df3f" } -iceberg-catalog-rest = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "fe5df3f" } +iceberg = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" } +iceberg-catalog-glue = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" } +iceberg-catalog-hms = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" } +iceberg-catalog-rest = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "01d706a1" } indexmap = "2.0.0" indicatif = "0.17.5" itertools = "0.13.0" diff --git a/src/query/storages/iceberg/Cargo.toml b/src/query/storages/iceberg/Cargo.toml index c832aa949c730..a1a3a34d6d200 100644 --- a/src/query/storages/iceberg/Cargo.toml +++ b/src/query/storages/iceberg/Cargo.toml @@ -29,7 +29,7 @@ iceberg-catalog-glue = { workspace = true } iceberg-catalog-hms = { workspace = true } iceberg-catalog-rest = { workspace = true } serde = { workspace = true } -tokio = { workspace = true } +serde_json = { workspace = true } typetag = { workspace = true } [lints] diff --git a/src/query/storages/iceberg/src/table.rs b/src/query/storages/iceberg/src/table.rs index eba5034fc5b76..b41ccaf7aaacd 100644 --- a/src/query/storages/iceberg/src/table.rs +++ b/src/query/storages/iceberg/src/table.rs @@ -13,6 +13,8 @@ // limitations under the License. use std::any::Any; +use std::collections::BTreeMap; +use std::collections::HashMap; use std::sync::Arc; use arrow_schema::Schema as ArrowSchema; @@ -40,7 +42,7 @@ use databend_common_meta_app::schema::TableMeta; use databend_common_pipeline_core::Pipeline; use databend_storages_common_table_meta::table::ChangeType; use futures::TryStreamExt; -use tokio::sync::OnceCell; +use iceberg::io::FileIOBuilder; use crate::partition::IcebergPartInfo; use crate::predicate::PredicateBuilder; @@ -53,28 +55,15 @@ pub const ICEBERG_ENGINE: &str = "ICEBERG"; #[derive(Clone)] pub struct IcebergTable { info: TableInfo, - ctl: IcebergCatalog, - database_name: String, - table_name: String, - table: OnceCell, + pub table: iceberg::table::Table, } impl IcebergTable { /// create a new table on the table directory - #[async_backtrace::framed] pub fn try_create(info: TableInfo) -> Result> { - let ctl = IcebergCatalog::try_create(info.catalog_info.clone())?; - let (db_name, table_name) = info.desc.as_str().rsplit_once('.').ok_or_else(|| { - ErrorCode::BadArguments(format!("Iceberg table desc {} is invalid", &info.desc)) - })?; - Ok(Box::new(Self { - info: info.clone(), - ctl, - database_name: db_name.to_string(), - table_name: table_name.to_string(), - table: OnceCell::new(), - })) + let table = Self::parse_engine_options(&info.meta.engine_options)?; + Ok(Box::new(Self { info, table })) } pub fn description() -> StorageDescription { @@ -111,6 +100,88 @@ impl IcebergTable { TableSchema::try_from(&arrow_schema) } + /// build_engine_options will generate `engine_options` from [`iceberg::table::Table`] so that + /// we can distribute it across nodes and rebuild this table without loading from catalog again. + /// + /// We will never persist the `engine_options` to storage, so it's safe to change the implementation. + /// As long as you make sure both [`build_engine_options`] and [`parse_engine_options`] been updated. + pub fn build_engine_options(table: &iceberg::table::Table) -> Result> { + let (file_io_scheme, file_io_props) = table.file_io().clone().into_props(); + let file_io_props = serde_json::to_string(&file_io_props)?; + let metadata_location = table + .metadata_location() + .map(|v| v.to_string()) + .unwrap_or_default(); + let metadata = serde_json::to_string(table.metadata())?; + let identifier = serde_json::to_string(table.identifier())?; + + Ok(BTreeMap::from_iter([ + ("iceberg.file_io.scheme".to_string(), file_io_scheme), + ("iceberg.file_io.props".to_string(), file_io_props), + ("iceberg.metadata_location".to_string(), metadata_location), + ("iceberg.metadata".to_string(), metadata), + ("iceberg.identifier".to_string(), identifier), + ])) + } + + /// parse_engine_options will parse `engine_options` to [`BTreeMap`] so that we can rebuild the table. + /// + /// See [`build_engine_options`] for more information. + pub fn parse_engine_options( + options: &BTreeMap, + ) -> Result { + let file_io_scheme = options.get("iceberg.file_io.scheme").ok_or_else(|| { + ErrorCode::ReadTableDataError( + "Rebuild iceberg table failed: Missing iceberg.file_io.scheme", + ) + })?; + + let file_io_props: HashMap = + serde_json::from_str(options.get("iceberg.file_io.props").ok_or_else(|| { + ErrorCode::ReadTableDataError( + "Rebuild iceberg table failed: Missing iceberg.file_io.props", + ) + })?)?; + + let metadata_location = options + .get("iceberg.metadata_location") + .map(|s| s.to_string()) + .unwrap_or_default(); + + let metadata: iceberg::spec::TableMetadata = + serde_json::from_str(options.get("iceberg.metadata").ok_or_else(|| { + ErrorCode::ReadTableDataError( + "Rebuild iceberg table failed: Missing iceberg.metadata", + ) + })?)?; + + let identifier: iceberg::TableIdent = + serde_json::from_str(options.get("iceberg.identifier").ok_or_else(|| { + ErrorCode::ReadTableDataError( + "Rebuild iceberg table failed: Missing iceberg.identifier", + ) + })?)?; + + let file_io = FileIOBuilder::new(file_io_scheme) + .with_props(file_io_props) + .build() + .map_err(|err| { + ErrorCode::ReadTableDataError(format!( + "Rebuild iceberg table file io failed: {err:?}" + )) + })?; + + iceberg::table::Table::builder() + .identifier(identifier) + .metadata(metadata) + .metadata_location(metadata_location) + .file_io(file_io) + .build() + .map_err(|err| { + ErrorCode::ReadTableDataError(format!("Rebuild iceberg table failed: {err:?}")) + }) + } + /// create a new table on the table directory #[async_backtrace::framed] pub async fn try_create_from_iceberg_catalog( @@ -121,6 +192,8 @@ impl IcebergTable { let table = Self::load_iceberg_table(&ctl, database_name, table_name).await?; let table_schema = Self::get_schema(&table)?; + let engine_options = Self::build_engine_options(&table)?; + // construct table info let info = TableInfo { ident: TableIdent::new(0, 0), @@ -129,6 +202,7 @@ impl IcebergTable { meta: TableMeta { schema: Arc::new(table_schema), engine: "iceberg".to_string(), + engine_options, created_on: Utc::now(), ..Default::default() }, @@ -136,31 +210,7 @@ impl IcebergTable { ..Default::default() }; - Ok(Self { - info, - ctl, - database_name: database_name.to_string(), - table_name: table_name.to_string(), - table: OnceCell::new_with(Some(table)), - }) - } - - /// Fetch or init the iceberg table - pub async fn table(&self) -> Result<&iceberg::table::Table> { - self.table - .get_or_try_init(|| async { - let table = - Self::load_iceberg_table(&self.ctl, &self.database_name, &self.table_name) - .await - .map_err(|err| { - ErrorCode::ReadTableDataError(format!( - "Iceberg catalog load failed: {err:?}" - )) - })?; - - Ok(table) - }) - .await + Ok(Self { info, table }) } pub fn do_read_data( @@ -189,9 +239,7 @@ impl IcebergTable { _: Arc, push_downs: Option, ) -> Result<(PartStatistics, Partitions)> { - let table = self.table().await?; - - let mut scan = table.scan(); + let mut scan = self.table.scan(); if let Some(push_downs) = &push_downs { if let Some(projection) = &push_downs.projection { diff --git a/src/query/storages/iceberg/src/table_source.rs b/src/query/storages/iceberg/src/table_source.rs index ef3d55a48f4ec..edcb21fb07fce 100644 --- a/src/query/storages/iceberg/src/table_source.rs +++ b/src/query/storages/iceberg/src/table_source.rs @@ -135,11 +135,17 @@ impl Processor for IcebergTableSource { // And we should try to build another stream (in next event loop). } else if let Some(part) = self.ctx.get_partition() { let part = IcebergPartInfo::from_part(&part)?; - // TODO: enable row filter? - let reader = self.table.table().await?.reader_builder().build(); + let reader = self + .table + .table + .reader_builder() + .with_batch_size(self.ctx.get_settings().get_parquet_max_block_size()? as usize) + .with_row_group_filtering_enabled(true) + .build(); // TODO: don't use stream here. let stream = reader .read(Box::pin(stream::iter([Ok(part.to_task())]))) + .await .map_err(|err| ErrorCode::Internal(format!("iceberg data stream read: {err:?}")))?; self.stream = Some(stream); } else {