Skip to content

feat(connector): add minio file scan type and enhance test #19950

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jan 7, 2025
55 changes: 55 additions & 0 deletions e2e_test/s3/file_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,46 @@ def do_test(config, file_num, item_num_per_file, prefix):
def _table():
return 's3_test_parquet'

print("test table function file scan")
cur.execute(f'''
SELECT
id,
name,
sex,
mark,
test_int,
test_int8,
test_uint8,
test_uint16,
test_uint32,
test_uint64,
test_float_16,
test_real,
test_double_precision,
test_varchar,
test_bytea,
test_date,
test_time,
test_timestamp_s,
test_timestamp_ms,
test_timestamp_us,
test_timestamp_ns,
test_timestamptz_s,
test_timestamptz_ms,
test_timestamptz_us,
test_timestamptz_ns
FROM file_scan(
'parquet',
'minio',
'custom',
'hummockadmin',
'hummockadmin',
's3://hummock001/test_file_scan/test_file_scan.parquet'
);''')
result = cur.fetchone()
assert result[0] == 0, f'file scan assertion failed: the first column is {result[0]}, expect 0.'

print("file scan test pass")
# Execute a SELECT statement
cur.execute(f'''CREATE TABLE {_table()}(
id bigint primary key,
Expand Down Expand Up @@ -491,7 +531,22 @@ def _assert_greater(field, got, expect):
_s3(idx),
_local(idx)
)
# put parquet file to test table function file scan
if data:
first_file_data = data[0]
first_table = pa.Table.from_pandas(pd.DataFrame(first_file_data))

first_file_name = f"test_file_scan.parquet"
first_file_path = f"test_file_scan/{first_file_name}"

pq.write_table(first_table, "data_0.parquet")

client.fput_object(
"hummock001",
first_file_path,
"data_0.parquet"
)

# do test
do_test(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id)

Expand Down
1 change: 1 addition & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ message FileScanNode {
enum StorageType {
STORAGE_TYPE_UNSPECIFIED = 0;
S3 = 1;
MINIO = 2;
}

repeated plan_common.ColumnDesc columns = 1;
Expand Down
14 changes: 13 additions & 1 deletion src/batch/executors/src/executor/s3_file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub struct S3FileScanExecutor {
batch_size: usize,
schema: Schema,
identity: String,
is_minio: bool,
}

impl Executor for S3FileScanExecutor {
Expand All @@ -67,6 +68,7 @@ impl S3FileScanExecutor {
batch_size: usize,
schema: Schema,
identity: String,
is_minio: bool,
) -> Self {
Self {
file_format,
Expand All @@ -77,6 +79,7 @@ impl S3FileScanExecutor {
batch_size,
schema,
identity,
is_minio,
}
}

Expand All @@ -90,6 +93,7 @@ impl S3FileScanExecutor {
self.s3_access_key.clone(),
self.s3_secret_key.clone(),
bucket.clone(),
self.is_minio,
)?;
let chunk_stream =
read_parquet_file(op, file_name, None, None, self.batch_size, 0).await?;
Expand All @@ -115,7 +119,14 @@ impl BoxedExecutorBuilder for FileScanExecutorBuilder {
NodeBody::FileScan
)?;

assert_eq!(file_scan_node.storage_type, StorageType::S3 as i32);
let storage_type = file_scan_node.storage_type;
let is_minio = if storage_type == (StorageType::S3 as i32) {
false
} else if storage_type == (StorageType::Minio as i32) {
true
} else {
todo!()
};

Ok(Box::new(S3FileScanExecutor::new(
match file_scan_node::FileFormat::try_from(file_scan_node.file_format).unwrap() {
Expand All @@ -129,6 +140,7 @@ impl BoxedExecutorBuilder for FileScanExecutorBuilder {
source.context().get_config().developer.chunk_size,
Schema::from_iter(file_scan_node.columns.iter().map(Field::from)),
source.plan_node().get_identity().clone(),
is_minio,
)))
}
}
44 changes: 16 additions & 28 deletions src/connector/src/source/iceberg/parquet_file_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,23 @@ pub fn new_s3_operator(
s3_access_key: String,
s3_secret_key: String,
bucket: String,
is_minio: bool,
) -> ConnectorResult<Operator> {
// Create s3 builder.
let mut builder = S3::default().bucket(&bucket).region(&s3_region);
builder = builder.secret_access_key(&s3_access_key);
builder = builder.secret_access_key(&s3_secret_key);
builder = builder.endpoint(&format!(
"https://{}.s3.{}.amazonaws.com",
bucket, s3_region
));

let mut builder = S3::default();
builder = builder
.region(&s3_region)
.access_key_id(&s3_access_key)
.secret_access_key(&s3_secret_key)
.bucket(&bucket);
builder = match is_minio {
true => builder.endpoint(&format!("http://{}.127.0.0.1:9301", bucket)),
false => builder.endpoint(&format!(
"https://{}.s3.{}.amazonaws.com",
bucket, s3_region
)),
};
builder = builder.disable_config_load();
let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
Expand All @@ -143,29 +150,10 @@ pub fn extract_bucket_and_file_name(location: &str) -> ConnectorResult<(String,
Ok((bucket, file_name))
}

pub async fn list_s3_directory(
s3_region: String,
s3_access_key: String,
s3_secret_key: String,
dir: String,
) -> Result<Vec<String>, anyhow::Error> {
pub async fn list_s3_directory(op: Operator, dir: String) -> Result<Vec<String>, anyhow::Error> {
let (bucket, file_name) = extract_bucket_and_file_name(&dir)?;
let prefix = format!("s3://{}/", bucket);
if dir.starts_with(&prefix) {
let mut builder = S3::default();
builder = builder
.region(&s3_region)
.access_key_id(&s3_access_key)
.secret_access_key(&s3_secret_key)
.bucket(&bucket);
builder = builder.endpoint(&format!(
"https://{}.s3.{}.amazonaws.com",
bucket, s3_region
));
let op = Operator::new(builder)?
.layer(RetryLayer::default())
.finish();

op.list(&file_name)
.await
.map_err(|e| anyhow!(e))
Expand Down
30 changes: 14 additions & 16 deletions src/frontend/src/expr/table_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ impl TableFunction {
.into());
}

if !"s3".eq_ignore_ascii_case(&eval_args[1]) {
if !"s3".eq_ignore_ascii_case(&eval_args[1])
&& !"minio".eq_ignore_ascii_case(&eval_args[1])
{
return Err(BindError(
"file_scan function only accepts 's3' as storage type".to_owned(),
)
Expand All @@ -148,21 +150,23 @@ impl TableFunction {

#[cfg(not(madsim))]
{
let (bucket, _) = extract_bucket_and_file_name(&eval_args[5].clone())?;

let op = new_s3_operator(
eval_args[2].clone(),
eval_args[3].clone(),
eval_args[4].clone(),
bucket.clone(),
"minio".eq_ignore_ascii_case(&eval_args[1]),
)?;
let files = if eval_args[5].ends_with('/') {
let files = tokio::task::block_in_place(|| {
FRONTEND_RUNTIME.block_on(async {
let files = list_s3_directory(
eval_args[2].clone(),
eval_args[3].clone(),
eval_args[4].clone(),
eval_args[5].clone(),
)
.await?;
let files = list_s3_directory(op.clone(), eval_args[5].clone()).await?;

Ok::<Vec<String>, anyhow::Error>(files)
})
})?;

if files.is_empty() {
return Err(BindError(
"file_scan function only accepts non-empty directory".to_owned(),
Expand All @@ -181,13 +185,7 @@ impl TableFunction {
Some(files) => files[0].clone(),
None => eval_args[5].clone(),
};
let (bucket, file_name) = extract_bucket_and_file_name(&location)?;
let op = new_s3_operator(
eval_args[2].clone(),
eval_args[3].clone(),
eval_args[4].clone(),
bucket.clone(),
)?;
let (_, file_name) = extract_bucket_and_file_name(&location)?;

let fields = get_parquet_fields(op, file_name).await?;

Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/batch_file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ impl ToBatchPb for BatchFileScan {
},
storage_type: match self.core.storage_type {
generic::StorageType::S3 => StorageType::S3 as i32,
generic::StorageType::Minio => StorageType::Minio as i32,
},
s3_region: self.core.s3_region.clone(),
s3_access_key: self.core.s3_access_key.clone(),
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/generic/file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub enum FileFormat {
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum StorageType {
S3,
Minio,
}

#[derive(Debug, Clone, Educe)]
Expand Down
12 changes: 9 additions & 3 deletions src/frontend/src/optimizer/plan_node/logical_file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,18 @@ impl LogicalFileScan {
file_location: Vec<String>,
) -> Self {
assert!("parquet".eq_ignore_ascii_case(&file_format));
assert!("s3".eq_ignore_ascii_case(&storage_type));

assert!(
"s3".eq_ignore_ascii_case(&storage_type) || "minio".eq_ignore_ascii_case(&storage_type)
);
let storage_type = if "s3".eq_ignore_ascii_case(&storage_type) {
generic::StorageType::S3
} else {
generic::StorageType::Minio
};
let core = generic::FileScan {
schema,
file_format: generic::FileFormat::Parquet,
storage_type: generic::StorageType::S3,
storage_type,
s3_region,
s3_access_key,
s3_secret_key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ impl Rule for TableFunctionToFileScanRule {
}
}
assert!("parquet".eq_ignore_ascii_case(&eval_args[0]));
assert!("s3".eq_ignore_ascii_case(&eval_args[1]));
assert!(
("s3".eq_ignore_ascii_case(&eval_args[1])
|| "minio".eq_ignore_ascii_case(&eval_args[1]))
);
let s3_region = eval_args[2].clone();
let s3_access_key = eval_args[3].clone();
let s3_secret_key = eval_args[4].clone();
Expand All @@ -69,7 +72,7 @@ impl Rule for TableFunctionToFileScanRule {
logical_table_function.ctx(),
schema,
"parquet".to_owned(),
"s3".to_owned(),
eval_args[1].to_owned(),
s3_region,
s3_access_key,
s3_secret_key,
Expand Down
1 change: 0 additions & 1 deletion src/object_store/src/object/opendal_engine/opendal_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ impl OpendalObjectStore {
"http://"
};
let (address, bucket) = rest.split_once('/').unwrap();

let builder = S3::default()
.bucket(bucket)
.region("custom")
Expand Down
Loading