diff --git a/Cargo.lock b/Cargo.lock index e38b9366b81ac..3204080a9b543 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1023,8 +1023,8 @@ dependencies = [ "pin-project-lite", "tokio", "xz2", - "zstd 0.13.0", - "zstd-safe 7.0.0", + "zstd 0.13.2", + "zstd-safe 7.2.1", ] [[package]] @@ -3598,7 +3598,7 @@ dependencies = [ "url", "uuid", "xz2", - "zstd 0.13.0", + "zstd 0.13.2", ] [[package]] @@ -5230,7 +5230,7 @@ dependencies = [ "thiserror 2.0.3", "tracing", "twox-hash 2.0.1", - "zstd 0.13.0", + "zstd 0.13.2", ] [[package]] @@ -6305,7 +6305,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=53f786fb2141b51d10a173cbcb5595edd5aa52a6#53f786fb2141b51d10a173cbcb5595edd5aa52a6" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=3ad5df0c3a09ebcaf218b6dc1213fdd5a0d68ccb#3ad5df0c3a09ebcaf218b6dc1213fdd5a0d68ccb" dependencies = [ "anyhow", "apache-avro 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -6352,7 +6352,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-glue" version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=53f786fb2141b51d10a173cbcb5595edd5aa52a6#53f786fb2141b51d10a173cbcb5595edd5aa52a6" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=3ad5df0c3a09ebcaf218b6dc1213fdd5a0d68ccb#3ad5df0c3a09ebcaf218b6dc1213fdd5a0d68ccb" dependencies = [ "anyhow", "async-trait", @@ -6369,7 +6369,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-rest" version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=53f786fb2141b51d10a173cbcb5595edd5aa52a6#53f786fb2141b51d10a173cbcb5595edd5aa52a6" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=3ad5df0c3a09ebcaf218b6dc1213fdd5a0d68ccb#3ad5df0c3a09ebcaf218b6dc1213fdd5a0d68ccb" dependencies = [ "async-trait", "chrono", @@ -7863,7 +7863,7 @@ dependencies = [ "thiserror 1.0.63", "time", "uuid", - "zstd 0.13.0", + "zstd 0.13.2", ] [[package]] @@ -8784,7 +8784,7 @@ dependencies = [ "thrift", "tokio", "twox-hash 1.6.3", - "zstd 0.13.0", + "zstd 0.13.2", "zstd-sys", ] @@ -8819,7 +8819,7 @@ dependencies = [ "thrift", "tokio", "twox-hash 1.6.3", - "zstd 0.13.0", + "zstd 0.13.2", "zstd-sys", ] @@ -11386,7 +11386,7 @@ dependencies = [ "tonic", "tracing", "workspace-hack", - "zstd 0.13.0", + "zstd 0.13.2", ] [[package]] @@ -11487,7 +11487,7 @@ dependencies = [ "url", "uuid", "workspace-hack", - "zstd 0.13.0", + "zstd 0.13.2", ] [[package]] @@ -12201,7 +12201,7 @@ dependencies = [ "workspace-hack", "xorf", "xxhash-rust", - "zstd 0.13.0", + "zstd 0.13.2", ] [[package]] @@ -15823,7 +15823,7 @@ dependencies = [ "sha2", "toml 0.8.12", "windows-sys 0.52.0", - "zstd 0.13.0", + "zstd 0.13.2", ] [[package]] @@ -16715,11 +16715,11 @@ dependencies = [ [[package]] name = "zstd" -version = "0.13.0" +version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bffb3309596d527cfcba7dfc6ed6052f1d39dfbd7c867aa2e865e4a449c10110" +checksum = "fcf2b778a664581e31e389454a7072dab1647606d44f7feea22cd5abb9c9f3f9" dependencies = [ - "zstd-safe 7.0.0", + "zstd-safe 7.2.1", ] [[package]] @@ -16734,20 +16734,19 @@ dependencies = [ [[package]] name = "zstd-safe" -version = "7.0.0" +version = "7.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43747c7422e2924c11144d5229878b98180ef8b06cca4ab5af37afc8a8d8ea3e" +checksum = "54a3ab4db68cea366acc5c897c7b4d4d1b8994a9cd6e6f841f8964566a419059" dependencies = [ "zstd-sys", ] [[package]] name = "zstd-sys" -version = "2.0.8+zstd.1.5.5" +version = "2.0.12+zstd.1.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5556e6ee25d32df2586c098bbfa278803692a20d0ab9565e049480d52707ec8c" +checksum = "0a4e40c320c3cb459d9a9ff6de98cff88f4751ee9275d140e2be94a2b74e4c13" dependencies = [ "cc", - "libc", "pkg-config", ] diff --git a/Cargo.toml b/Cargo.toml index 4a86290bbc128..107ea5fb4df54 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -143,9 +143,9 @@ icelake = { git = "https://github.com/risingwavelabs/icelake.git", rev = "0ec44f "prometheus", ] } # branch dev-rebase-main-20241030 -iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "53f786fb2141b51d10a173cbcb5595edd5aa52a6" } -iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "53f786fb2141b51d10a173cbcb5595edd5aa52a6" } -iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "53f786fb2141b51d10a173cbcb5595edd5aa52a6" } +iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "3ad5df0c3a09ebcaf218b6dc1213fdd5a0d68ccb" } +iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "3ad5df0c3a09ebcaf218b6dc1213fdd5a0d68ccb" } +iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "3ad5df0c3a09ebcaf218b6dc1213fdd5a0d68ccb" } opendal = "0.49" # used only by arrow-udf-flight arrow-flight = "53" diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 2373b7d483e30..a9dd800de4b9f 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -121,6 +121,17 @@ message MySqlQueryNode { string query = 7; } +message IcebergMetadataScanNode { + map with_properties = 1; + map secret_refs = 2; + enum IcebergMetadataTableType { + UNSPECIFIED = 0; + SNAPSHOTS = 1; + } + // + IcebergMetadataTableType table_type = 3; +} + message ProjectNode { repeated expr.ExprNode select_list = 1; } @@ -405,6 +416,7 @@ message PlanNode { IcebergScanNode iceberg_scan = 39; PostgresQueryNode postgres_query = 40; MySqlQueryNode mysql_query = 41; + IcebergMetadataScanNode iceberg_metadata_scan = 42; // The following nodes are used for testing. bool block_executor = 100; bool busy_loop_executor = 101; diff --git a/src/batch/executors/src/executor.rs b/src/batch/executors/src/executor.rs index e6835a009b035..07127086409c6 100644 --- a/src/batch/executors/src/executor.rs +++ b/src/batch/executors/src/executor.rs @@ -22,6 +22,7 @@ mod generic_exchange; mod group_top_n; mod hash_agg; mod hop_window; +mod iceberg_metadata_scan; mod iceberg_scan; mod insert; mod join; @@ -56,6 +57,7 @@ pub use generic_exchange::*; pub use group_top_n::*; pub use hash_agg::*; pub use hop_window::*; +pub use iceberg_metadata_scan::*; pub use iceberg_scan::*; pub use insert::*; pub use join::*; diff --git a/src/batch/executors/src/executor/iceberg_metadata_scan.rs b/src/batch/executors/src/executor/iceberg_metadata_scan.rs new file mode 100644 index 0000000000000..03ec1783204ea --- /dev/null +++ b/src/batch/executors/src/executor/iceberg_metadata_scan.rs @@ -0,0 +1,88 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use futures_async_stream::try_stream; +use futures_util::stream::StreamExt; +use risingwave_common::array::arrow::IcebergArrowConvert; +use risingwave_common::array::DataChunk; +use risingwave_common::catalog::Schema; +use risingwave_connector::source::iceberg::IcebergProperties; +use risingwave_connector::source::ConnectorProperties; +use risingwave_connector::WithOptionsSecResolved; +use risingwave_pb::batch_plan::plan_node::NodeBody; + +use crate::error::BatchError; +use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder}; + +pub struct IcebergMetadataScanExecutor { + schema: Schema, + identity: String, + iceberg_properties: IcebergProperties, +} + +impl Executor for IcebergMetadataScanExecutor { + fn schema(&self) -> &risingwave_common::catalog::Schema { + &self.schema + } + + fn identity(&self) -> &str { + &self.identity + } + + fn execute(self: Box) -> super::BoxedDataChunkStream { + self.do_execute().boxed() + } +} + +impl IcebergMetadataScanExecutor { + #[try_stream(ok = DataChunk, error = BatchError)] + async fn do_execute(self: Box) { + let table = self.iceberg_properties.load_table_v2().await?; + let snapshot = table.metadata_scan().snapshots()?; + let chunk = IcebergArrowConvert.chunk_from_record_batch(&snapshot)?; + yield chunk; + + return Ok(()); + } +} + +pub struct IcebergMetadataScanExecutorBuilder {} + +impl BoxedExecutorBuilder for IcebergMetadataScanExecutorBuilder { + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, + _inputs: Vec, + ) -> crate::error::Result { + let node = try_match_expand!( + source.plan_node().get_node_body().unwrap(), + NodeBody::IcebergMetadataScan + )?; + + let options_with_secret = + WithOptionsSecResolved::new(node.with_properties.clone(), node.secret_refs.clone()); + let iceberg_properties = if let ConnectorProperties::Iceberg(config) = + ConnectorProperties::extract(options_with_secret.clone(), true)? + { + *config + } else { + unreachable!() + }; + + Ok(Box::new(IcebergMetadataScanExecutor { + iceberg_properties, + identity: source.plan_node().get_identity().clone(), + schema: Schema::new(vec![]), + })) + } +} diff --git a/src/common/src/array/arrow/arrow_iceberg.rs b/src/common/src/array/arrow/arrow_iceberg.rs index 279cb234a3f91..1f789bfccd843 100644 --- a/src/common/src/array/arrow/arrow_iceberg.rs +++ b/src/common/src/array/arrow/arrow_iceberg.rs @@ -24,6 +24,7 @@ pub use super::arrow_53::{ arrow_array, arrow_buffer, arrow_cast, arrow_schema, FromArrow, ToArrow, }; use crate::array::{Array, ArrayError, ArrayImpl, DataChunk, DataType, DecimalArray}; +use crate::catalog::Schema; use crate::types::StructType; pub struct IcebergArrowConvert; @@ -44,6 +45,13 @@ impl IcebergArrowConvert { FromArrow::from_record_batch(self, batch) } + pub fn schema_from_arrow_schema( + &self, + schema: &arrow_schema::Schema, + ) -> Result { + FromArrow::from_schema(self, schema) + } + pub fn type_from_field(&self, field: &arrow_schema::Field) -> Result { FromArrow::from_field(self, field) } diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs index 7b398a58924c6..20a54893c66d2 100644 --- a/src/common/src/array/arrow/arrow_impl.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -53,6 +53,7 @@ use itertools::Itertools; use super::{arrow_array, arrow_buffer, arrow_cast, arrow_schema, ArrowIntervalType}; // Other import should always use the absolute path. use crate::array::*; +use crate::catalog::{Field, Schema}; use crate::types::*; use crate::util::iter_util::ZipEqFast; @@ -479,6 +480,15 @@ pub trait FromArrow { Ok(DataChunk::new(columns, batch.num_rows())) } + fn from_schema(&self, schema: &arrow_schema::Schema) -> Result { + let fields = schema + .fields() + .iter() + .map(|f| Ok(Field::new(f.name(), self.from_field(f)?))) + .try_collect::<_, Vec<_>, ArrayError>()?; + Ok(Schema::new(fields)) + } + /// Converts Arrow `Fields` to RisingWave `StructType`. fn from_fields(&self, fields: &arrow_schema::Fields) -> Result { Ok(StructType::new( diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 1cf640ca04e9a..93adf5b575c90 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -21,15 +21,18 @@ use anyhow::anyhow; use async_trait::async_trait; use futures_async_stream::for_await; use iceberg::expr::Predicate as IcebergPredicate; +use iceberg::metadata_scan::MetadataTable; use iceberg::scan::FileScanTask; use iceberg::spec::TableMetadata; use iceberg::table::Table; use itertools::Itertools; pub use parquet_file_handler::*; +use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::bail; use risingwave_common::catalog::{Schema, ICEBERG_SEQUENCE_NUM_COLUMN_NAME}; use risingwave_common::types::JsonbVal; use risingwave_common::util::iter_util::ZipEqFast; +use risingwave_pb::batch_plan::iceberg_metadata_scan_node::IcebergMetadataTableType; use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType; use serde::{Deserialize, Serialize}; @@ -494,3 +497,13 @@ impl SplitReader for IcebergFileReader { unimplemented!() } } + +pub fn iceberg_metadata_table_schema(table_type: IcebergMetadataTableType) -> Schema { + let arrow_schema = match table_type { + IcebergMetadataTableType::Snapshots => iceberg::metadata_scan::SnapshotsTable::schema(), + _ => unreachable!(), + }; + IcebergArrowConvert + .schema_from_arrow_schema(&arrow_schema) + .expect("should be a valid schema") +} diff --git a/src/frontend/src/binder/mod.rs b/src/frontend/src/binder/mod.rs index 82fb74d575e86..b4cbc6bc2bc33 100644 --- a/src/frontend/src/binder/mod.rs +++ b/src/frontend/src/binder/mod.rs @@ -52,9 +52,9 @@ pub use insert::BoundInsert; use pgwire::pg_server::{Session, SessionId}; pub use query::BoundQuery; pub use relation::{ - BoundBackCteRef, BoundBaseTable, BoundJoin, BoundShare, BoundShareInput, BoundSource, - BoundSystemTable, BoundWatermark, BoundWindowTableFunction, Relation, - ResolveQualifiedNameError, WindowTableFunctionKind, + BoundBackCteRef, BoundBaseTable, BoundIcebergTableFunction, BoundJoin, BoundShare, + BoundShareInput, BoundSource, BoundSystemTable, BoundWatermark, BoundWindowTableFunction, + Relation, ResolveQualifiedNameError, WindowTableFunctionKind, }; pub use select::{BoundDistinct, BoundSelect}; pub use set_expr::*; diff --git a/src/frontend/src/binder/relation/mod.rs b/src/frontend/src/binder/relation/mod.rs index f2d7fbd4dd8f1..a4d61815099cd 100644 --- a/src/frontend/src/binder/relation/mod.rs +++ b/src/frontend/src/binder/relation/mod.rs @@ -48,7 +48,9 @@ pub use share::{BoundShare, BoundShareInput}; pub use subquery::BoundSubquery; pub use table_or_source::{BoundBaseTable, BoundSource, BoundSystemTable}; pub use watermark::BoundWatermark; -pub use window_table_function::{BoundWindowTableFunction, WindowTableFunctionKind}; +pub use window_table_function::{ + BoundIcebergTableFunction, BoundWindowTableFunction, WindowTableFunctionKind, +}; use crate::expr::{CorrelatedId, Depth}; @@ -63,6 +65,7 @@ pub enum Relation { Join(Box), Apply(Box), WindowTableFunction(Box), + IcebergTableFunction(Box), /// Table function or scalar function. TableFunction { expr: ExprImpl, diff --git a/src/frontend/src/binder/relation/table_function.rs b/src/frontend/src/binder/relation/table_function.rs index ec23c898e8561..16ccc18ecf1fb 100644 --- a/src/frontend/src/binder/relation/table_function.rs +++ b/src/frontend/src/binder/relation/table_function.rs @@ -21,6 +21,7 @@ use risingwave_common::types::DataType; use risingwave_sqlparser::ast::{Function, FunctionArg, FunctionArgList, ObjectName, TableAlias}; use super::watermark::is_watermark_func; +use super::window_table_function::parse_iceberg_metadata_table_type; use super::{Binder, Relation, Result, WindowTableFunctionKind}; use crate::binder::bind_context::Clause; use crate::error::ErrorCode; @@ -66,6 +67,14 @@ impl Binder { self.bind_window_table_function(alias, kind, args)?, ))); } + + // iceberg metadata table + if let Some(table_type) = parse_iceberg_metadata_table_type(func_name) { + return Ok(Relation::IcebergTableFunction(Box::new( + self.bind_iceberg_table_function(alias, table_type, args)?, + ))); + } + // watermark if is_watermark_func(func_name) { if with_ordinality { diff --git a/src/frontend/src/binder/relation/window_table_function.rs b/src/frontend/src/binder/relation/window_table_function.rs index 000be2f6a2fac..1d1158ec6ddc3 100644 --- a/src/frontend/src/binder/relation/window_table_function.rs +++ b/src/frontend/src/binder/relation/window_table_function.rs @@ -17,6 +17,8 @@ use std::str::FromStr; use itertools::Itertools; use risingwave_common::catalog::Field; use risingwave_common::types::DataType; +use risingwave_connector::source::iceberg::iceberg_metadata_table_schema; +use risingwave_pb::batch_plan::iceberg_metadata_scan_node::IcebergMetadataTableType; use risingwave_sqlparser::ast::{FunctionArg, TableAlias}; use super::{Binder, Relation, Result}; @@ -126,3 +128,58 @@ impl Binder { }) } } + +#[derive(Debug, Clone)] +pub struct BoundIcebergTableFunction { + pub(crate) input: Relation, + pub(crate) table_type: IcebergMetadataTableType, +} + +impl RewriteExprsRecursive for BoundIcebergTableFunction { + fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) { + self.input.rewrite_exprs_recursive(rewriter); + } +} + +const ICEBERG_ERROR_1ST_ARG: &str = + "The 1st arg of iceberg table function should be a table name of an iceberg source."; + +impl Binder { + pub(super) fn bind_iceberg_table_function( + &mut self, + alias: Option, + table_type: IcebergMetadataTableType, + args: Vec, + ) -> Result { + let mut args = args.into_iter(); + + self.push_context(); + + let (base, _table_name) = + self.bind_relation_by_function_arg(args.next(), ICEBERG_ERROR_1ST_ARG)?; + + self.pop_context()?; + let columns = iceberg_metadata_table_schema(table_type) + .fields() + .iter() + .map(|c| Ok((false, c.clone()))) + .collect::>>()?; + + let metadata_table_name = "?"; + self.bind_table_to_context(columns, metadata_table_name.to_owned(), alias)?; + + Ok(BoundIcebergTableFunction { + input: base, + table_type, + }) + } +} + +/// `iceberg_snapshots` etc. +pub(super) fn parse_iceberg_metadata_table_type(s: &str) -> Option { + if let Some(s) = s.strip_prefix("iceberg_") { + IcebergMetadataTableType::from_str_name_not_unspecified(&s.to_ascii_uppercase()) + } else { + None + } +} diff --git a/src/frontend/src/handler/privilege.rs b/src/frontend/src/handler/privilege.rs index 1aff2e69f682f..49586c068a236 100644 --- a/src/frontend/src/handler/privilege.rs +++ b/src/frontend/src/handler/privilege.rs @@ -77,6 +77,9 @@ pub(crate) fn resolve_relation_privileges( Relation::WindowTableFunction(table) => { resolve_relation_privileges(&table.input, mode, objects) } + Relation::IcebergTableFunction(table) => { + resolve_relation_privileges(&table.input, mode, objects) + } _ => {} }; } diff --git a/src/frontend/src/optimizer/plan_node/batch_iceberg_metadata_scan.rs b/src/frontend/src/optimizer/plan_node/batch_iceberg_metadata_scan.rs new file mode 100644 index 0000000000000..91a1b9dffe358 --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/batch_iceberg_metadata_scan.rs @@ -0,0 +1,100 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::hash::{Hash, Hasher}; + +use pretty_xmlish::XmlNode; +use risingwave_pb::batch_plan::plan_node::NodeBody; +use risingwave_pb::batch_plan::IcebergMetadataScanNode; + +use super::batch::prelude::*; +use super::utils::{childless_record, column_names_pretty, Distill}; +use super::{ + generic, ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch, ToLocalBatch, +}; +use crate::error::Result; +use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::property::{Distribution, Order}; + +#[derive(Debug, Clone)] +pub struct BatchIcebergMetadataScan { + pub base: PlanBase, + pub core: generic::IcebergMetadataScan, +} + +impl PartialEq for BatchIcebergMetadataScan { + fn eq(&self, other: &Self) -> bool { + self.base == other.base && self.core == other.core + } +} + +impl Eq for BatchIcebergMetadataScan {} + +impl Hash for BatchIcebergMetadataScan { + fn hash(&self, state: &mut H) { + self.base.hash(state); + self.core.hash(state); + } +} + +impl BatchIcebergMetadataScan { + pub fn new(core: generic::IcebergMetadataScan) -> Self { + let base = PlanBase::new_batch_with_core(&core, Distribution::Single, Order::any()); + + Self { base, core } + } + + pub fn column_names(&self) -> Vec<&str> { + self.schema().names_str() + } +} + +impl_plan_tree_node_for_leaf! { BatchIcebergMetadataScan } + +impl Distill for BatchIcebergMetadataScan { + fn distill<'a>(&self) -> XmlNode<'a> { + let fields = vec![ + ("type", self.core.table_type.as_str_name().into()), + ("columns", column_names_pretty(self.schema())), + ]; + childless_record("BatchIcebergMetadataScan", fields) + } +} + +impl ToLocalBatch for BatchIcebergMetadataScan { + fn to_local(&self) -> Result { + Ok(self.clone().into()) + } +} + +impl ToDistributedBatch for BatchIcebergMetadataScan { + fn to_distributed(&self) -> Result { + Ok(self.clone().into()) + } +} + +impl ToBatchPb for BatchIcebergMetadataScan { + fn to_batch_prost_body(&self) -> NodeBody { + let (with_properties, secret_refs) = self.core.iceberg_properties.clone().into_parts(); + NodeBody::IcebergMetadataScan(IcebergMetadataScanNode { + table_type: self.core.table_type.into(), + with_properties, + secret_refs, + }) + } +} + +impl ExprRewritable for BatchIcebergMetadataScan {} + +impl ExprVisitable for BatchIcebergMetadataScan {} diff --git a/src/frontend/src/optimizer/plan_node/generic/iceberg_metadata_scan.rs b/src/frontend/src/optimizer/plan_node/generic/iceberg_metadata_scan.rs new file mode 100644 index 0000000000000..47bb40342032f --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/generic/iceberg_metadata_scan.rs @@ -0,0 +1,65 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use educe::Educe; +use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema}; +use risingwave_connector::WithOptionsSecResolved; +use risingwave_pb::batch_plan::iceberg_metadata_scan_node::IcebergMetadataTableType; + +use super::GenericPlanNode; +use crate::optimizer::optimizer_context::OptimizerContextRef; +use crate::optimizer::property::FunctionalDependencySet; + +#[derive(Debug, Clone, Educe)] +#[educe(PartialEq, Eq, Hash)] +pub struct IcebergMetadataScan { + pub schema: Schema, + pub iceberg_properties: WithOptionsSecResolved, + pub table_type: IcebergMetadataTableType, + + #[educe(PartialEq(ignore))] + #[educe(Hash(ignore))] + pub ctx: OptimizerContextRef, +} + +impl GenericPlanNode for IcebergMetadataScan { + fn schema(&self) -> Schema { + self.schema.clone() + } + + fn stream_key(&self) -> Option> { + None + } + + fn ctx(&self) -> OptimizerContextRef { + self.ctx.clone() + } + + fn functional_dependency(&self) -> FunctionalDependencySet { + FunctionalDependencySet::new(self.schema.len()) + } +} + +impl IcebergMetadataScan { + pub fn columns(&self) -> Vec { + self.schema + .fields + .iter() + .enumerate() + .map(|(i, f)| { + ColumnDesc::named(f.name.clone(), ColumnId::new(i as i32), f.data_type.clone()) + }) + .collect() + } +} diff --git a/src/frontend/src/optimizer/plan_node/generic/mod.rs b/src/frontend/src/optimizer/plan_node/generic/mod.rs index c35a367e8ccec..87d3abcb91dc6 100644 --- a/src/frontend/src/optimizer/plan_node/generic/mod.rs +++ b/src/frontend/src/optimizer/plan_node/generic/mod.rs @@ -88,12 +88,12 @@ pub use now::*; mod file_scan; pub use file_scan::*; - mod postgres_query; pub use postgres_query::*; - mod mysql_query; pub use mysql_query::*; +mod iceberg_metadata_scan; +pub use iceberg_metadata_scan::*; pub trait DistillUnit { fn distill_with_name<'a>(&self, name: impl Into>) -> XmlNode<'a>; diff --git a/src/frontend/src/optimizer/plan_node/generic/source.rs b/src/frontend/src/optimizer/plan_node/generic/source.rs index 8ecdf6d9446a0..a3a93098afa26 100644 --- a/src/frontend/src/optimizer/plan_node/generic/source.rs +++ b/src/frontend/src/optimizer/plan_node/generic/source.rs @@ -114,6 +114,16 @@ impl Source { .is_some_and(|catalog| catalog.with_properties.is_kafka_connector()) } + pub fn connector_name(&self) -> String { + match &self.catalog { + Some(catalog) => catalog + .with_properties + .get_connector() + .unwrap_or("no connector".to_owned()), + None => "no connector".to_owned(), + } + } + /// Currently, only iceberg source supports time travel. pub fn support_time_travel(&self) -> bool { self.is_iceberg_connector() diff --git a/src/frontend/src/optimizer/plan_node/logical_iceberg_metadata_scan.rs b/src/frontend/src/optimizer/plan_node/logical_iceberg_metadata_scan.rs new file mode 100644 index 0000000000000..c9a63033a6508 --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/logical_iceberg_metadata_scan.rs @@ -0,0 +1,116 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use pretty_xmlish::XmlNode; +use risingwave_connector::source::iceberg::iceberg_metadata_table_schema; +use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt}; +use risingwave_pb::batch_plan::iceberg_metadata_scan_node::IcebergMetadataTableType; + +use super::generic::GenericPlanRef; +use super::utils::{childless_record, Distill}; +use super::{ + generic, BatchIcebergMetadataScan, ColPrunable, ExprRewritable, Logical, LogicalProject, + PlanBase, PlanRef, PredicatePushdown, ToBatch, ToStream, +}; +use crate::error::Result; +use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::plan_node::utils::column_names_pretty; +use crate::optimizer::plan_node::{ + ColumnPruningContext, LogicalFilter, PredicatePushdownContext, RewriteStreamContext, + ToStreamContext, +}; +use crate::utils::{ColIndexMapping, Condition}; +use crate::OptimizerContextRef; + +/// `LogicalIcebergMetadataScan` is only used by batch queries. At the beginning of the batch query optimization, `LogicalSource` with a iceberg property would be converted into a `LogicalIcebergMetadataScan`. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct LogicalIcebergMetadataScan { + pub base: PlanBase, + pub core: generic::IcebergMetadataScan, +} + +impl LogicalIcebergMetadataScan { + pub fn new( + iceberg_properties: WithOptionsSecResolved, + table_type: IcebergMetadataTableType, + ctx: OptimizerContextRef, + ) -> Self { + assert!(iceberg_properties.is_iceberg_connector()); + let schema = iceberg_metadata_table_schema(table_type); + + let core = generic::IcebergMetadataScan { + schema, + iceberg_properties, + table_type, + ctx, + }; + let base = PlanBase::new_logical_with_core(&core); + + LogicalIcebergMetadataScan { base, core } + } +} + +impl_plan_tree_node_for_leaf! {LogicalIcebergMetadataScan} +impl Distill for LogicalIcebergMetadataScan { + fn distill<'a>(&self) -> XmlNode<'a> { + let fields = vec![ + ("table_type", self.core.table_type.as_str_name().into()), + ("columns", column_names_pretty(self.schema())), + ]; + childless_record("LogicalIcebergMetadataScan", fields) + } +} + +impl ColPrunable for LogicalIcebergMetadataScan { + fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef { + // no pruning + let mapping = ColIndexMapping::with_remaining_columns(required_cols, self.schema().len()); + LogicalProject::with_mapping(self.clone().into(), mapping).into() + } +} + +impl ExprRewritable for LogicalIcebergMetadataScan {} + +impl ExprVisitable for LogicalIcebergMetadataScan {} + +impl PredicatePushdown for LogicalIcebergMetadataScan { + fn predicate_pushdown( + &self, + predicate: Condition, + _ctx: &mut PredicatePushdownContext, + ) -> PlanRef { + // No pushdown. + LogicalFilter::create(self.clone().into(), predicate) + } +} + +impl ToBatch for LogicalIcebergMetadataScan { + fn to_batch(&self) -> Result { + let plan: PlanRef = BatchIcebergMetadataScan::new(self.core.clone()).into(); + Ok(plan) + } +} + +impl ToStream for LogicalIcebergMetadataScan { + fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result { + unreachable!() + } + + fn logical_rewrite_for_stream( + &self, + _ctx: &mut RewriteStreamContext, + ) -> Result<(PlanRef, ColIndexMapping)> { + unreachable!() + } +} diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 5e7e76500e9a0..bc92f314935d8 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -989,6 +989,7 @@ mod stream_values; mod stream_watermark_filter; mod batch_file_scan; +mod batch_iceberg_metadata_scan; mod batch_iceberg_scan; mod batch_kafka_scan; mod batch_postgres_query; @@ -996,6 +997,7 @@ mod batch_postgres_query; mod batch_mysql_query; mod derive; mod logical_file_scan; +mod logical_iceberg_metadata_scan; mod logical_iceberg_scan; mod logical_postgres_query; @@ -1015,6 +1017,7 @@ pub use batch_group_topn::BatchGroupTopN; pub use batch_hash_agg::BatchHashAgg; pub use batch_hash_join::BatchHashJoin; pub use batch_hop_window::BatchHopWindow; +pub use batch_iceberg_metadata_scan::BatchIcebergMetadataScan; pub use batch_iceberg_scan::BatchIcebergScan; pub use batch_insert::BatchInsert; pub use batch_kafka_scan::BatchKafkaScan; @@ -1051,6 +1054,7 @@ pub use logical_expand::LogicalExpand; pub use logical_file_scan::LogicalFileScan; pub use logical_filter::LogicalFilter; pub use logical_hop_window::LogicalHopWindow; +pub use logical_iceberg_metadata_scan::LogicalIcebergMetadataScan; pub use logical_iceberg_scan::LogicalIcebergScan; pub use logical_insert::LogicalInsert; pub use logical_intersect::LogicalIntersect; @@ -1170,6 +1174,7 @@ macro_rules! for_all_plan_nodes { , { Logical, MaxOneRow } , { Logical, KafkaScan } , { Logical, IcebergScan } + , { Logical, IcebergMetadataScan } , { Logical, RecursiveUnion } , { Logical, CteRef } , { Logical, ChangeLog } @@ -1206,6 +1211,7 @@ macro_rules! for_all_plan_nodes { , { Batch, MaxOneRow } , { Batch, KafkaScan } , { Batch, IcebergScan } + , { Batch, IcebergMetadataScan } , { Batch, FileScan } , { Batch, PostgresQuery } , { Batch, MySqlQuery } @@ -1286,6 +1292,7 @@ macro_rules! for_logical_plan_nodes { , { Logical, MaxOneRow } , { Logical, KafkaScan } , { Logical, IcebergScan } + , { Logical, IcebergMetadataScan } , { Logical, RecursiveUnion } , { Logical, CteRef } , { Logical, ChangeLog } @@ -1331,6 +1338,7 @@ macro_rules! for_batch_plan_nodes { , { Batch, MaxOneRow } , { Batch, KafkaScan } , { Batch, IcebergScan } + , { Batch, IcebergMetadataScan } , { Batch, FileScan } , { Batch, PostgresQuery } , { Batch, MySqlQuery } diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 7a08502f8519c..892a7110c9864 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -26,15 +26,17 @@ use risingwave_common::{bail, bail_not_implemented}; use risingwave_sqlparser::ast::AsOf; use crate::binder::{ - BoundBackCteRef, BoundBaseTable, BoundJoin, BoundShare, BoundShareInput, BoundSource, - BoundSystemTable, BoundWatermark, BoundWindowTableFunction, Relation, WindowTableFunctionKind, + BoundBackCteRef, BoundBaseTable, BoundIcebergTableFunction, BoundJoin, BoundShare, + BoundShareInput, BoundSource, BoundSystemTable, BoundWatermark, BoundWindowTableFunction, + Relation, WindowTableFunctionKind, }; use crate::error::{ErrorCode, Result}; use crate::expr::{CastContext, Expr, ExprImpl, ExprType, FunctionCall, InputRef, Literal}; use crate::optimizer::plan_node::generic::SourceNodeKind; use crate::optimizer::plan_node::{ - LogicalApply, LogicalCteRef, LogicalHopWindow, LogicalJoin, LogicalProject, LogicalScan, - LogicalShare, LogicalSource, LogicalSysScan, LogicalTableFunction, LogicalValues, PlanRef, + LogicalApply, LogicalCteRef, LogicalHopWindow, LogicalIcebergMetadataScan, LogicalJoin, + LogicalProject, LogicalScan, LogicalShare, LogicalSource, LogicalSysScan, LogicalTableFunction, + LogicalValues, PlanRef, }; use crate::optimizer::property::Cardinality; use crate::planner::{PlanFor, Planner}; @@ -53,6 +55,7 @@ impl Planner { Relation::Join(join) => self.plan_join(*join), Relation::Apply(join) => self.plan_apply(*join), Relation::WindowTableFunction(tf) => self.plan_window_table_function(*tf), + Relation::IcebergTableFunction(tf) => self.plan_iceberg_table_function(*tf), Relation::Source(s) => self.plan_source(*s), Relation::TableFunction { expr: tf, @@ -300,6 +303,44 @@ impl Planner { } } + pub(super) fn plan_iceberg_table_function( + &mut self, + table_function: BoundIcebergTableFunction, + ) -> Result { + let base = self.plan_relation(table_function.input)?; + match base.as_logical_source() { + Some(source) => { + if source.core.is_iceberg_connector() { + let iceberg_scan = LogicalIcebergMetadataScan::new( + source + .core + .catalog + .as_ref() + .unwrap() + .with_properties + .clone(), + table_function.table_type, + self.ctx(), + ); + Ok(iceberg_scan.into()) + } else { + Err(ErrorCode::BindError(format!( + "The input of iceberg metadata table function `{}` should be an iceberg source, got {} source", + table_function.table_type.as_str_name(), + source.core.connector_name() + )) + .into()) + } + } + _ => Err(ErrorCode::BindError(format!( + "The input of iceberg metadata table function `{}` should be an iceberg source, got {:?}", + table_function.table_type.as_str_name(), + base.node_type() + )) + .into()), + } + } + pub(super) fn plan_table_function( &mut self, table_function: ExprImpl, diff --git a/src/prost/build.rs b/src/prost/build.rs index e906c47efbd55..a66395ae5b80e 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -81,6 +81,7 @@ fn main() -> Result<(), Box> { ".stream_plan.StreamSource", ".batch_plan.SourceNode", ".batch_plan.IcebergScanNode", + ".batch_plan.IcebergMetadataScanNode", ]; // Build protobuf structs. diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs index 0ebd4d5f4a096..5941355a9ed46 100644 --- a/src/prost/src/lib.rs +++ b/src/prost/src/lib.rs @@ -494,6 +494,15 @@ impl std::fmt::Debug for plan_common::ColumnDesc { } } +impl batch_plan::iceberg_metadata_scan_node::IcebergMetadataTableType { + pub fn from_str_name_not_unspecified(s: &str) -> Option { + if s == "UNSPECIFIED" { + return None; + } + Self::from_str_name(s) + } +} + #[cfg(test)] mod tests { use crate::data::{data_type, DataType};