diff --git a/e2e_test/s3/file_sink.py b/e2e_test/s3/file_sink.py index a64f40d0692df..bdabdbbd08e44 100644 --- a/e2e_test/s3/file_sink.py +++ b/e2e_test/s3/file_sink.py @@ -62,6 +62,46 @@ def do_test(config, file_num, item_num_per_file, prefix): def _table(): return 's3_test_parquet' + print("test table function file scan") + cur.execute(f''' + SELECT + id, + name, + sex, + mark, + test_int, + test_int8, + test_uint8, + test_uint16, + test_uint32, + test_uint64, + test_float_16, + test_real, + test_double_precision, + test_varchar, + test_bytea, + test_date, + test_time, + test_timestamp_s, + test_timestamp_ms, + test_timestamp_us, + test_timestamp_ns, + test_timestamptz_s, + test_timestamptz_ms, + test_timestamptz_us, + test_timestamptz_ns + FROM file_scan( + 'parquet', + 's3', + 'http://127.0.0.1:9301', + 'hummockadmin', + 'hummockadmin', + 's3://hummock001/test_file_scan/test_file_scan.parquet' + );''') + result = cur.fetchone() + assert result[0] == 0, f'file scan assertion failed: the first column is {result[0]}, expect 0.' + + print("file scan test pass") # Execute a SELECT statement cur.execute(f'''CREATE TABLE {_table()}( id bigint primary key, @@ -491,6 +531,21 @@ def _assert_greater(field, got, expect): _s3(idx), _local(idx) ) + # put parquet file to test table function file scan + if data: + first_file_data = data[0] + first_table = pa.Table.from_pandas(pd.DataFrame(first_file_data)) + + first_file_name = f"test_file_scan.parquet" + first_file_path = f"test_file_scan/{first_file_name}" + + pq.write_table(first_table, "data_0.parquet") + + client.fput_object( + "hummock001", + first_file_path, + "data_0.parquet" + ) # do test do_test(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id) diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 2373b7d483e30..80667d51c05de 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -97,6 +97,7 @@ message FileScanNode { string s3_access_key = 5; string s3_secret_key = 6; repeated string file_location = 7; + bool is_minio = 8; } // NOTE(kwannoel): This will only be used in batch mode. We can change the definition as needed. diff --git a/src/batch/executors/src/executor/s3_file_scan.rs b/src/batch/executors/src/executor/s3_file_scan.rs index 8140011dfcfce..64373df5053ef 100644 --- a/src/batch/executors/src/executor/s3_file_scan.rs +++ b/src/batch/executors/src/executor/s3_file_scan.rs @@ -20,7 +20,6 @@ use risingwave_connector::source::iceberg::{ extract_bucket_and_file_name, new_s3_operator, read_parquet_file, }; use risingwave_pb::batch_plan::file_scan_node; -use risingwave_pb::batch_plan::file_scan_node::StorageType; use risingwave_pb::batch_plan::plan_node::NodeBody; use crate::error::BatchError; @@ -38,6 +37,7 @@ pub struct S3FileScanExecutor { s3_region: String, s3_access_key: String, s3_secret_key: String, + is_minio: bool, batch_size: usize, schema: Schema, identity: String, @@ -67,6 +67,7 @@ impl S3FileScanExecutor { batch_size: usize, schema: Schema, identity: String, + is_minio: bool, ) -> Self { Self { file_format, @@ -74,6 +75,7 @@ impl S3FileScanExecutor { s3_region, s3_access_key, s3_secret_key, + is_minio, batch_size, schema, identity, @@ -90,6 +92,7 @@ impl S3FileScanExecutor { self.s3_access_key.clone(), self.s3_secret_key.clone(), bucket.clone(), + self.is_minio, )?; let chunk_stream = read_parquet_file(op, file_name, None, None, self.batch_size, 0).await?; @@ -115,8 +118,6 @@ impl BoxedExecutorBuilder for FileScanExecutorBuilder { NodeBody::FileScan )?; - assert_eq!(file_scan_node.storage_type, StorageType::S3 as i32); - Ok(Box::new(S3FileScanExecutor::new( match file_scan_node::FileFormat::try_from(file_scan_node.file_format).unwrap() { file_scan_node::FileFormat::Parquet => FileFormat::Parquet, @@ -129,6 +130,7 @@ impl BoxedExecutorBuilder for FileScanExecutorBuilder { source.context().get_config().developer.chunk_size, Schema::from_iter(file_scan_node.columns.iter().map(Field::from)), source.plan_node().get_identity().clone(), + file_scan_node.is_minio, ))) } } diff --git a/src/connector/src/source/iceberg/parquet_file_handler.rs b/src/connector/src/source/iceberg/parquet_file_handler.rs index 49b6d9a425276..b040570e5c41d 100644 --- a/src/connector/src/source/iceberg/parquet_file_handler.rs +++ b/src/connector/src/source/iceberg/parquet_file_handler.rs @@ -109,15 +109,30 @@ pub fn new_s3_operator( s3_access_key: String, s3_secret_key: String, bucket: String, + is_minio: bool, ) -> ConnectorResult { - // Create s3 builder. - let mut builder = S3::default().bucket(&bucket).region(&s3_region); - builder = builder.secret_access_key(&s3_access_key); - builder = builder.secret_access_key(&s3_secret_key); - builder = builder.endpoint(&format!( - "https://{}.s3.{}.amazonaws.com", - bucket, s3_region - )); + let mut builder = S3::default(); + builder = match is_minio { + true => { + builder + .region("us-east-1") // hard code as not used but needed. + .access_key_id(&s3_access_key) + .secret_access_key(&s3_secret_key) + .endpoint(&s3_region) // for minio backend, the `s3_region` parameter is passed in as the endpoint. + .bucket(&bucket) + } + false => builder + .region(&s3_region) + .access_key_id(&s3_access_key) + .secret_access_key(&s3_secret_key) + .endpoint(&format!( + "https://{}.s3.{}.amazonaws.com", + bucket, s3_region + )) + .bucket(&bucket), + }; + + builder = builder.disable_config_load(); let op: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) @@ -143,29 +158,10 @@ pub fn extract_bucket_and_file_name(location: &str) -> ConnectorResult<(String, Ok((bucket, file_name)) } -pub async fn list_s3_directory( - s3_region: String, - s3_access_key: String, - s3_secret_key: String, - dir: String, -) -> Result, anyhow::Error> { +pub async fn list_s3_directory(op: Operator, dir: String) -> Result, anyhow::Error> { let (bucket, file_name) = extract_bucket_and_file_name(&dir)?; let prefix = format!("s3://{}/", bucket); if dir.starts_with(&prefix) { - let mut builder = S3::default(); - builder = builder - .region(&s3_region) - .access_key_id(&s3_access_key) - .secret_access_key(&s3_secret_key) - .bucket(&bucket); - builder = builder.endpoint(&format!( - "https://{}.s3.{}.amazonaws.com", - bucket, s3_region - )); - let op = Operator::new(builder)? - .layer(RetryLayer::default()) - .finish(); - op.list(&file_name) .await .map_err(|e| anyhow!(e)) diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index d49b4332b117f..5ee1097ed5ab8 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -148,21 +148,24 @@ impl TableFunction { #[cfg(not(madsim))] { + let (bucket, _) = extract_bucket_and_file_name(&eval_args[5].clone())?; + + let is_minio = eval_args[2].starts_with("http"); + let op = new_s3_operator( + eval_args[2].clone(), + eval_args[3].clone(), + eval_args[4].clone(), + bucket.clone(), + is_minio, + )?; let files = if eval_args[5].ends_with('/') { let files = tokio::task::block_in_place(|| { FRONTEND_RUNTIME.block_on(async { - let files = list_s3_directory( - eval_args[2].clone(), - eval_args[3].clone(), - eval_args[4].clone(), - eval_args[5].clone(), - ) - .await?; + let files = list_s3_directory(op.clone(), eval_args[5].clone()).await?; Ok::, anyhow::Error>(files) }) })?; - if files.is_empty() { return Err(BindError( "file_scan function only accepts non-empty directory".to_owned(), @@ -181,13 +184,7 @@ impl TableFunction { Some(files) => files[0].clone(), None => eval_args[5].clone(), }; - let (bucket, file_name) = extract_bucket_and_file_name(&location)?; - let op = new_s3_operator( - eval_args[2].clone(), - eval_args[3].clone(), - eval_args[4].clone(), - bucket.clone(), - )?; + let (_, file_name) = extract_bucket_and_file_name(&location)?; let fields = get_parquet_fields(op, file_name).await?; diff --git a/src/frontend/src/optimizer/plan_node/batch_file_scan.rs b/src/frontend/src/optimizer/plan_node/batch_file_scan.rs index 649c178855ef9..20d0ecdd03673 100644 --- a/src/frontend/src/optimizer/plan_node/batch_file_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_file_scan.rs @@ -94,6 +94,7 @@ impl ToBatchPb for BatchFileScan { s3_access_key: self.core.s3_access_key.clone(), s3_secret_key: self.core.s3_secret_key.clone(), file_location: self.core.file_location.clone(), + is_minio: self.core.is_minio, }) } } diff --git a/src/frontend/src/optimizer/plan_node/generic/file_scan.rs b/src/frontend/src/optimizer/plan_node/generic/file_scan.rs index 975151d89c797..d1f9dc1c14c0c 100644 --- a/src/frontend/src/optimizer/plan_node/generic/file_scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/file_scan.rs @@ -39,6 +39,7 @@ pub struct FileScan { pub s3_access_key: String, pub s3_secret_key: String, pub file_location: Vec, + pub is_minio: bool, #[educe(PartialEq(ignore))] #[educe(Hash(ignore))] diff --git a/src/frontend/src/optimizer/plan_node/logical_file_scan.rs b/src/frontend/src/optimizer/plan_node/logical_file_scan.rs index abe8e40a8224f..00016b2c91141 100644 --- a/src/frontend/src/optimizer/plan_node/logical_file_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_file_scan.rs @@ -48,19 +48,21 @@ impl LogicalFileScan { s3_access_key: String, s3_secret_key: String, file_location: Vec, + is_minio: bool, ) -> Self { assert!("parquet".eq_ignore_ascii_case(&file_format)); assert!("s3".eq_ignore_ascii_case(&storage_type)); - + let storage_type = generic::StorageType::S3; let core = generic::FileScan { schema, file_format: generic::FileFormat::Parquet, - storage_type: generic::StorageType::S3, + storage_type, s3_region, s3_access_key, s3_secret_key, file_location, ctx, + is_minio, }; let base = PlanBase::new_logical_with_core(&core); diff --git a/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs b/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs index ab538fb223bd7..d1b7e22b49f8f 100644 --- a/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs +++ b/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs @@ -58,22 +58,27 @@ impl Rule for TableFunctionToFileScanRule { } } assert!("parquet".eq_ignore_ascii_case(&eval_args[0])); - assert!("s3".eq_ignore_ascii_case(&eval_args[1])); + assert!( + ("s3".eq_ignore_ascii_case(&eval_args[1]) + || "minio".eq_ignore_ascii_case(&eval_args[1])) + ); let s3_region = eval_args[2].clone(); let s3_access_key = eval_args[3].clone(); let s3_secret_key = eval_args[4].clone(); // The rest of the arguments are file locations let file_location = eval_args[5..].iter().cloned().collect_vec(); + let is_minio = s3_region.starts_with("http"); Some( LogicalFileScan::new( logical_table_function.ctx(), schema, "parquet".to_owned(), - "s3".to_owned(), + eval_args[1].to_owned(), s3_region, s3_access_key, s3_secret_key, file_location, + is_minio, ) .into(), ) diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index d44b1f745df75..7a71ada5ca4b8 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -78,7 +78,6 @@ impl OpendalObjectStore { "http://" }; let (address, bucket) = rest.split_once('/').unwrap(); - let builder = S3::default() .bucket(bucket) .region("custom")