Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connector): add minio file scan type and enhance test #19950

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
55 changes: 55 additions & 0 deletions e2e_test/s3/file_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,46 @@ def do_test(config, file_num, item_num_per_file, prefix):
def _table():
return 's3_test_parquet'

print("test table function file scan")
cur.execute(f'''
SELECT
id,
name,
sex,
mark,
test_int,
test_int8,
test_uint8,
test_uint16,
test_uint32,
test_uint64,
test_float_16,
test_real,
test_double_precision,
test_varchar,
test_bytea,
test_date,
test_time,
test_timestamp_s,
test_timestamp_ms,
test_timestamp_us,
test_timestamp_ns,
test_timestamptz_s,
test_timestamptz_ms,
test_timestamptz_us,
test_timestamptz_ns
FROM file_scan(
'parquet',
's3',
'http://127.0.0.1:9301',
'hummockadmin',
'hummockadmin',
's3://hummock001/test_file_scan/test_file_scan.parquet'
);''')
result = cur.fetchone()
assert result[0] == 0, f'file scan assertion failed: the first column is {result[0]}, expect 0.'

print("file scan test pass")
# Execute a SELECT statement
cur.execute(f'''CREATE TABLE {_table()}(
id bigint primary key,
Expand Down Expand Up @@ -491,6 +531,21 @@ def _assert_greater(field, got, expect):
_s3(idx),
_local(idx)
)
# put parquet file to test table function file scan
if data:
first_file_data = data[0]
first_table = pa.Table.from_pandas(pd.DataFrame(first_file_data))

first_file_name = f"test_file_scan.parquet"
first_file_path = f"test_file_scan/{first_file_name}"

pq.write_table(first_table, "data_0.parquet")

client.fput_object(
"hummock001",
first_file_path,
"data_0.parquet"
)

# do test
do_test(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id)
Expand Down
3 changes: 0 additions & 3 deletions src/batch/executors/src/executor/s3_file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use risingwave_connector::source::iceberg::{
extract_bucket_and_file_name, new_s3_operator, read_parquet_file,
};
use risingwave_pb::batch_plan::file_scan_node;
use risingwave_pb::batch_plan::file_scan_node::StorageType;
use risingwave_pb::batch_plan::plan_node::NodeBody;

use crate::error::BatchError;
Expand Down Expand Up @@ -115,8 +114,6 @@ impl BoxedExecutorBuilder for FileScanExecutorBuilder {
NodeBody::FileScan
)?;

assert_eq!(file_scan_node.storage_type, StorageType::S3 as i32);

Ok(Box::new(S3FileScanExecutor::new(
match file_scan_node::FileFormat::try_from(file_scan_node.file_format).unwrap() {
file_scan_node::FileFormat::Parquet => FileFormat::Parquet,
Expand Down
53 changes: 25 additions & 28 deletions src/connector/src/source/iceberg/parquet_file_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,30 @@ pub fn new_s3_operator(
s3_secret_key: String,
bucket: String,
) -> ConnectorResult<Operator> {
// Create s3 builder.
let mut builder = S3::default().bucket(&bucket).region(&s3_region);
builder = builder.secret_access_key(&s3_access_key);
builder = builder.secret_access_key(&s3_secret_key);
builder = builder.endpoint(&format!(
"https://{}.s3.{}.amazonaws.com",
bucket, s3_region
));
let is_minio = s3_region.starts_with("http");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the executor. I think it's better to put this logic at src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs


let mut builder = S3::default();
builder = match is_minio {
true => {
builder
.region("us-east-1") // hard code as not used but needed.
.access_key_id(&s3_access_key)
.secret_access_key(&s3_secret_key)
.endpoint(&s3_region) // for minio backend, the `s3_region` parameter is passed in as the endpoint.
.bucket(&bucket)
}
false => builder
.region(&s3_region)
.access_key_id(&s3_access_key)
.secret_access_key(&s3_secret_key)
.endpoint(&format!(
"https://{}.s3.{}.amazonaws.com",
bucket, s3_region
))
.bucket(&bucket),
};

builder = builder.disable_config_load();

let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
Expand All @@ -143,29 +159,10 @@ pub fn extract_bucket_and_file_name(location: &str) -> ConnectorResult<(String,
Ok((bucket, file_name))
}

pub async fn list_s3_directory(
s3_region: String,
s3_access_key: String,
s3_secret_key: String,
dir: String,
) -> Result<Vec<String>, anyhow::Error> {
pub async fn list_s3_directory(op: Operator, dir: String) -> Result<Vec<String>, anyhow::Error> {
let (bucket, file_name) = extract_bucket_and_file_name(&dir)?;
let prefix = format!("s3://{}/", bucket);
if dir.starts_with(&prefix) {
let mut builder = S3::default();
builder = builder
.region(&s3_region)
.access_key_id(&s3_access_key)
.secret_access_key(&s3_secret_key)
.bucket(&bucket);
builder = builder.endpoint(&format!(
"https://{}.s3.{}.amazonaws.com",
bucket, s3_region
));
let op = Operator::new(builder)?
.layer(RetryLayer::default())
.finish();

op.list(&file_name)
.await
.map_err(|e| anyhow!(e))
Expand Down
25 changes: 10 additions & 15 deletions src/frontend/src/expr/table_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,21 +148,22 @@ impl TableFunction {

#[cfg(not(madsim))]
{
let (bucket, _) = extract_bucket_and_file_name(&eval_args[5].clone())?;

let op = new_s3_operator(
eval_args[2].clone(),
eval_args[3].clone(),
eval_args[4].clone(),
bucket.clone(),
)?;
let files = if eval_args[5].ends_with('/') {
let files = tokio::task::block_in_place(|| {
FRONTEND_RUNTIME.block_on(async {
let files = list_s3_directory(
eval_args[2].clone(),
eval_args[3].clone(),
eval_args[4].clone(),
eval_args[5].clone(),
)
.await?;
let files = list_s3_directory(op.clone(), eval_args[5].clone()).await?;

Ok::<Vec<String>, anyhow::Error>(files)
})
})?;

if files.is_empty() {
return Err(BindError(
"file_scan function only accepts non-empty directory".to_owned(),
Expand All @@ -181,13 +182,7 @@ impl TableFunction {
Some(files) => files[0].clone(),
None => eval_args[5].clone(),
};
let (bucket, file_name) = extract_bucket_and_file_name(&location)?;
let op = new_s3_operator(
eval_args[2].clone(),
eval_args[3].clone(),
eval_args[4].clone(),
bucket.clone(),
)?;
let (_, file_name) = extract_bucket_and_file_name(&location)?;

let fields = get_parquet_fields(op, file_name).await?;

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/logical_file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ impl LogicalFileScan {
) -> Self {
assert!("parquet".eq_ignore_ascii_case(&file_format));
assert!("s3".eq_ignore_ascii_case(&storage_type));

let storage_type = generic::StorageType::S3;
let core = generic::FileScan {
schema,
file_format: generic::FileFormat::Parquet,
storage_type: generic::StorageType::S3,
storage_type,
s3_region,
s3_access_key,
s3_secret_key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ impl Rule for TableFunctionToFileScanRule {
}
}
assert!("parquet".eq_ignore_ascii_case(&eval_args[0]));
assert!("s3".eq_ignore_ascii_case(&eval_args[1]));
assert!(
("s3".eq_ignore_ascii_case(&eval_args[1])
|| "minio".eq_ignore_ascii_case(&eval_args[1]))
);
let s3_region = eval_args[2].clone();
let s3_access_key = eval_args[3].clone();
let s3_secret_key = eval_args[4].clone();
Expand All @@ -69,7 +72,7 @@ impl Rule for TableFunctionToFileScanRule {
logical_table_function.ctx(),
schema,
"parquet".to_owned(),
"s3".to_owned(),
eval_args[1].to_owned(),
s3_region,
s3_access_key,
s3_secret_key,
Expand Down
1 change: 0 additions & 1 deletion src/object_store/src/object/opendal_engine/opendal_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ impl OpendalObjectStore {
"http://"
};
let (address, bucket) = rest.split_once('/').unwrap();

let builder = S3::default()
.bucket(bucket)
.region("custom")
Expand Down
Loading