Skip to content

Commit 87f0838

Browse files
authored
Use single file write when an extension is present in the path. (#13079)
* Use single file write when an extension is present in the path. * Adjust formatting. * Remove unneeded return statement..
1 parent 5e53b63 commit 87f0838

File tree

3 files changed

+206
-47
lines changed

3 files changed

+206
-47
lines changed

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 135 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -2274,47 +2274,7 @@ mod tests {
22742274

22752275
#[tokio::test]
22762276
async fn parquet_sink_write() -> Result<()> {
2277-
let field_a = Field::new("a", DataType::Utf8, false);
2278-
let field_b = Field::new("b", DataType::Utf8, false);
2279-
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
2280-
let object_store_url = ObjectStoreUrl::local_filesystem();
2281-
2282-
let file_sink_config = FileSinkConfig {
2283-
object_store_url: object_store_url.clone(),
2284-
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
2285-
table_paths: vec![ListingTableUrl::parse("file:///")?],
2286-
output_schema: schema.clone(),
2287-
table_partition_cols: vec![],
2288-
insert_op: InsertOp::Overwrite,
2289-
keep_partition_by_columns: false,
2290-
};
2291-
let parquet_sink = Arc::new(ParquetSink::new(
2292-
file_sink_config,
2293-
TableParquetOptions {
2294-
key_value_metadata: std::collections::HashMap::from([
2295-
("my-data".to_string(), Some("stuff".to_string())),
2296-
("my-data-bool-key".to_string(), None),
2297-
]),
2298-
..Default::default()
2299-
},
2300-
));
2301-
2302-
// create data
2303-
let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"]));
2304-
let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"]));
2305-
let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap();
2306-
2307-
// write stream
2308-
parquet_sink
2309-
.write_all(
2310-
Box::pin(RecordBatchStreamAdapter::new(
2311-
schema,
2312-
futures::stream::iter(vec![Ok(batch)]),
2313-
)),
2314-
&build_ctx(object_store_url.as_ref()),
2315-
)
2316-
.await
2317-
.unwrap();
2277+
let parquet_sink = create_written_parquet_sink("file:///").await?;
23182278

23192279
// assert written
23202280
let mut written = parquet_sink.written();
@@ -2366,6 +2326,140 @@ mod tests {
23662326
Ok(())
23672327
}
23682328

2329+
#[tokio::test]
2330+
async fn parquet_sink_write_with_extension() -> Result<()> {
2331+
let filename = "test_file.custom_ext";
2332+
let file_path = format!("file:///path/to/{}", filename);
2333+
let parquet_sink = create_written_parquet_sink(file_path.as_str()).await?;
2334+
2335+
// assert written
2336+
let mut written = parquet_sink.written();
2337+
let written = written.drain();
2338+
assert_eq!(
2339+
written.len(),
2340+
1,
2341+
"expected a single parquet file to be written, instead found {}",
2342+
written.len()
2343+
);
2344+
2345+
let (path, ..) = written.take(1).next().unwrap();
2346+
2347+
let path_parts = path.parts().collect::<Vec<_>>();
2348+
assert_eq!(
2349+
path_parts.len(),
2350+
3,
2351+
"Expected 3 path parts, instead found {}",
2352+
path_parts.len()
2353+
);
2354+
assert_eq!(path_parts.last().unwrap().as_ref(), filename);
2355+
2356+
Ok(())
2357+
}
2358+
2359+
#[tokio::test]
2360+
async fn parquet_sink_write_with_directory_name() -> Result<()> {
2361+
let file_path = "file:///path/to";
2362+
let parquet_sink = create_written_parquet_sink(file_path).await?;
2363+
2364+
// assert written
2365+
let mut written = parquet_sink.written();
2366+
let written = written.drain();
2367+
assert_eq!(
2368+
written.len(),
2369+
1,
2370+
"expected a single parquet file to be written, instead found {}",
2371+
written.len()
2372+
);
2373+
2374+
let (path, ..) = written.take(1).next().unwrap();
2375+
2376+
let path_parts = path.parts().collect::<Vec<_>>();
2377+
assert_eq!(
2378+
path_parts.len(),
2379+
3,
2380+
"Expected 3 path parts, instead found {}",
2381+
path_parts.len()
2382+
);
2383+
assert!(path_parts.last().unwrap().as_ref().ends_with(".parquet"));
2384+
2385+
Ok(())
2386+
}
2387+
2388+
#[tokio::test]
2389+
async fn parquet_sink_write_with_folder_ending() -> Result<()> {
2390+
let file_path = "file:///path/to/";
2391+
let parquet_sink = create_written_parquet_sink(file_path).await?;
2392+
2393+
// assert written
2394+
let mut written = parquet_sink.written();
2395+
let written = written.drain();
2396+
assert_eq!(
2397+
written.len(),
2398+
1,
2399+
"expected a single parquet file to be written, instead found {}",
2400+
written.len()
2401+
);
2402+
2403+
let (path, ..) = written.take(1).next().unwrap();
2404+
2405+
let path_parts = path.parts().collect::<Vec<_>>();
2406+
assert_eq!(
2407+
path_parts.len(),
2408+
3,
2409+
"Expected 3 path parts, instead found {}",
2410+
path_parts.len()
2411+
);
2412+
assert!(path_parts.last().unwrap().as_ref().ends_with(".parquet"));
2413+
2414+
Ok(())
2415+
}
2416+
2417+
async fn create_written_parquet_sink(table_path: &str) -> Result<Arc<ParquetSink>> {
2418+
let field_a = Field::new("a", DataType::Utf8, false);
2419+
let field_b = Field::new("b", DataType::Utf8, false);
2420+
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
2421+
let object_store_url = ObjectStoreUrl::local_filesystem();
2422+
2423+
let file_sink_config = FileSinkConfig {
2424+
object_store_url: object_store_url.clone(),
2425+
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
2426+
table_paths: vec![ListingTableUrl::parse(table_path)?],
2427+
output_schema: schema.clone(),
2428+
table_partition_cols: vec![],
2429+
insert_op: InsertOp::Overwrite,
2430+
keep_partition_by_columns: false,
2431+
};
2432+
let parquet_sink = Arc::new(ParquetSink::new(
2433+
file_sink_config,
2434+
TableParquetOptions {
2435+
key_value_metadata: std::collections::HashMap::from([
2436+
("my-data".to_string(), Some("stuff".to_string())),
2437+
("my-data-bool-key".to_string(), None),
2438+
]),
2439+
..Default::default()
2440+
},
2441+
));
2442+
2443+
// create data
2444+
let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"]));
2445+
let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"]));
2446+
let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap();
2447+
2448+
// write stream
2449+
parquet_sink
2450+
.write_all(
2451+
Box::pin(RecordBatchStreamAdapter::new(
2452+
schema,
2453+
futures::stream::iter(vec![Ok(batch)]),
2454+
)),
2455+
&build_ctx(object_store_url.as_ref()),
2456+
)
2457+
.await
2458+
.unwrap();
2459+
2460+
Ok(parquet_sink)
2461+
}
2462+
23692463
#[tokio::test]
23702464
async fn parquet_sink_write_partitions() -> Result<()> {
23712465
let field_a = Field::new("a", DataType::Utf8, false);

datafusion/core/src/datasource/file_format/write/demux.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,9 @@ type DemuxedStreamReceiver = UnboundedReceiver<(Path, RecordBatchReceiver)>;
5959
/// which should be contained within the same output file. The outer channel
6060
/// is used to send a dynamic number of inner channels, representing a dynamic
6161
/// number of total output files. The caller is also responsible to monitor
62-
/// the demux task for errors and abort accordingly. The single_file_output parameter
63-
/// overrides all other settings to force only a single file to be written.
62+
/// the demux task for errors and abort accordingly. A path with an extension will
63+
/// force only a single file to be written with the extension from the path. Otherwise
64+
/// the default extension will be used and the output will be split into multiple files.
6465
/// partition_by parameter will additionally split the input based on the unique
6566
/// values of a specific column `<https://github.com/apache/datafusion/issues/7744>``
6667
/// ┌───────────┐ ┌────────────┐ ┌─────────────┐
@@ -79,12 +80,13 @@ pub(crate) fn start_demuxer_task(
7980
context: &Arc<TaskContext>,
8081
partition_by: Option<Vec<(String, DataType)>>,
8182
base_output_path: ListingTableUrl,
82-
file_extension: String,
83+
default_extension: String,
8384
keep_partition_by_columns: bool,
8485
) -> (SpawnedTask<Result<()>>, DemuxedStreamReceiver) {
8586
let (tx, rx) = mpsc::unbounded_channel();
8687
let context = context.clone();
87-
let single_file_output = !base_output_path.is_collection();
88+
let single_file_output =
89+
!base_output_path.is_collection() && base_output_path.file_extension().is_some();
8890
let task = match partition_by {
8991
Some(parts) => {
9092
// There could be an arbitrarily large number of parallel hive style partitions being written to, so we cannot
@@ -96,7 +98,7 @@ pub(crate) fn start_demuxer_task(
9698
context,
9799
parts,
98100
base_output_path,
99-
file_extension,
101+
default_extension,
100102
keep_partition_by_columns,
101103
)
102104
.await
@@ -108,7 +110,7 @@ pub(crate) fn start_demuxer_task(
108110
input,
109111
context,
110112
base_output_path,
111-
file_extension,
113+
default_extension,
112114
single_file_output,
113115
)
114116
.await

datafusion/core/src/datasource/listing/url.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,19 @@ impl ListingTableUrl {
190190
self.url.path().ends_with(DELIMITER)
191191
}
192192

193+
/// Returns the file extension of the last path segment if it exists
194+
pub fn file_extension(&self) -> Option<&str> {
195+
if let Some(segments) = self.url.path_segments() {
196+
if let Some(last_segment) = segments.last() {
197+
if last_segment.contains(".") && !last_segment.ends_with(".") {
198+
return last_segment.split('.').last();
199+
}
200+
}
201+
}
202+
203+
None
204+
}
205+
193206
/// Strips the prefix of this [`ListingTableUrl`] from the provided path, returning
194207
/// an iterator of the remaining path segments
195208
pub(crate) fn strip_prefix<'a, 'b: 'a>(
@@ -493,4 +506,54 @@ mod tests {
493506
"path not ends with / - fragment ends with / - not collection",
494507
);
495508
}
509+
510+
#[test]
511+
fn test_file_extension() {
512+
fn test(input: &str, expected: Option<&str>, message: &str) {
513+
let url = ListingTableUrl::parse(input).unwrap();
514+
assert_eq!(url.file_extension(), expected, "{message}");
515+
}
516+
517+
test("https://a.b.c/path/", None, "path ends with / - not a file");
518+
test(
519+
"https://a.b.c/path/?a=b",
520+
None,
521+
"path ends with / - with query args - not a file",
522+
);
523+
test(
524+
"https://a.b.c/path?a=b/",
525+
None,
526+
"path not ends with / - query ends with / but no file extension",
527+
);
528+
test(
529+
"https://a.b.c/path/#a=b",
530+
None,
531+
"path ends with / - with fragment - not a file",
532+
);
533+
test(
534+
"https://a.b.c/path#a=b/",
535+
None,
536+
"path not ends with / - fragment ends with / but no file extension",
537+
);
538+
test(
539+
"file///some/path/",
540+
None,
541+
"file path ends with / - not a file",
542+
);
543+
test(
544+
"file///some/path/file",
545+
None,
546+
"file path does not end with - no extension",
547+
);
548+
test(
549+
"file///some/path/file.",
550+
None,
551+
"file path ends with . - no value after .",
552+
);
553+
test(
554+
"file///some/path/file.ext",
555+
Some("ext"),
556+
"file path ends with .ext - extension is ext",
557+
);
558+
}
496559
}

0 commit comments

Comments
 (0)