Skip to content

Commit 2c3fe5f

Browse files
committed
lint
1 parent ecb5d92 commit 2c3fe5f

File tree

2 files changed

+71
-58
lines changed

2 files changed

+71
-58
lines changed

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 70 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1248,37 +1248,40 @@ impl Stream for RepartitionStream {
12481248
RepartitionStreamState::ReceivingFromChannel => {
12491249
let value = futures::ready!(self.input.recv().poll_unpin(cx));
12501250
match value {
1251-
Some(Some(v)) => match v {
1252-
Ok(RepartitionBatch::Memory(batch)) => {
1253-
// Release memory and return
1254-
self.reservation
1255-
.lock()
1256-
.shrink(batch.get_array_memory_size());
1257-
return Poll::Ready(Some(Ok(batch)));
1258-
}
1259-
Ok(RepartitionBatch::Spilled) => {
1260-
// Get next file from SpillPool and create a stream
1261-
let next_file = self.spill_pool.lock().take_next_file()?;
1262-
match next_file {
1263-
Some(spill_file) => {
1264-
// Create stream using SpillReaderStream + spawn_buffered
1265-
let stream = self
1266-
.spill_manager
1267-
.read_spill_as_stream(spill_file, None)?;
1268-
self.state =
1251+
Some(Some(v)) => {
1252+
match v {
1253+
Ok(RepartitionBatch::Memory(batch)) => {
1254+
// Release memory and return
1255+
self.reservation
1256+
.lock()
1257+
.shrink(batch.get_array_memory_size());
1258+
return Poll::Ready(Some(Ok(batch)));
1259+
}
1260+
Ok(RepartitionBatch::Spilled) => {
1261+
// Get next file from SpillPool and create a stream
1262+
let next_file =
1263+
self.spill_pool.lock().take_next_file()?;
1264+
match next_file {
1265+
Some(spill_file) => {
1266+
// Create stream using SpillReaderStream + spawn_buffered
1267+
let stream = self
1268+
.spill_manager
1269+
.read_spill_as_stream(spill_file, None)?;
1270+
self.state =
12691271
RepartitionStreamState::ReadingSpilledBatch(stream);
1270-
continue;
1271-
}
1272-
None => {
1273-
// No spilled files available, continue receiving from channel
1274-
continue;
1272+
continue;
1273+
}
1274+
None => {
1275+
// No spilled files available, continue receiving from channel
1276+
continue;
1277+
}
12751278
}
12761279
}
1280+
Err(e) => {
1281+
return Poll::Ready(Some(Err(e)));
1282+
}
12771283
}
1278-
Err(e) => {
1279-
return Poll::Ready(Some(Err(e)));
1280-
}
1281-
},
1284+
}
12821285
Some(None) => {
12831286
self.num_input_partitions_processed += 1;
12841287

@@ -1317,12 +1320,15 @@ impl Stream for RepartitionStream {
13171320
.spill_manager
13181321
.read_spill_as_stream(spill_file, None)?;
13191322
self.state =
1320-
RepartitionStreamState::ReadingSpilledBatch(new_stream);
1323+
RepartitionStreamState::ReadingSpilledBatch(
1324+
new_stream,
1325+
);
13211326
continue;
13221327
}
13231328
None => {
13241329
// No more spilled files, go back to receiving from channel
1325-
self.state = RepartitionStreamState::ReceivingFromChannel;
1330+
self.state =
1331+
RepartitionStreamState::ReceivingFromChannel;
13261332
continue;
13271333
}
13281334
}
@@ -1378,37 +1384,40 @@ impl Stream for PerPartitionStream {
13781384
RepartitionStreamState::ReceivingFromChannel => {
13791385
let value = futures::ready!(self.receiver.recv().poll_unpin(cx));
13801386
match value {
1381-
Some(Some(v)) => match v {
1382-
Ok(RepartitionBatch::Memory(batch)) => {
1383-
// Release memory and return
1384-
self.reservation
1385-
.lock()
1386-
.shrink(batch.get_array_memory_size());
1387-
return Poll::Ready(Some(Ok(batch)));
1388-
}
1389-
Ok(RepartitionBatch::Spilled) => {
1390-
// Get next file from SpillPool and create a stream
1391-
let next_file = self.spill_pool.lock().take_next_file()?;
1392-
match next_file {
1393-
Some(spill_file) => {
1394-
// Create stream using SpillReaderStream + spawn_buffered
1395-
let stream = self
1396-
.spill_manager
1397-
.read_spill_as_stream(spill_file, None)?;
1398-
self.state =
1387+
Some(Some(v)) => {
1388+
match v {
1389+
Ok(RepartitionBatch::Memory(batch)) => {
1390+
// Release memory and return
1391+
self.reservation
1392+
.lock()
1393+
.shrink(batch.get_array_memory_size());
1394+
return Poll::Ready(Some(Ok(batch)));
1395+
}
1396+
Ok(RepartitionBatch::Spilled) => {
1397+
// Get next file from SpillPool and create a stream
1398+
let next_file =
1399+
self.spill_pool.lock().take_next_file()?;
1400+
match next_file {
1401+
Some(spill_file) => {
1402+
// Create stream using SpillReaderStream + spawn_buffered
1403+
let stream = self
1404+
.spill_manager
1405+
.read_spill_as_stream(spill_file, None)?;
1406+
self.state =
13991407
RepartitionStreamState::ReadingSpilledBatch(stream);
1400-
continue;
1401-
}
1402-
None => {
1403-
// No spilled files available, continue receiving from channel
1404-
continue;
1408+
continue;
1409+
}
1410+
None => {
1411+
// No spilled files available, continue receiving from channel
1412+
continue;
1413+
}
14051414
}
14061415
}
1416+
Err(e) => {
1417+
return Poll::Ready(Some(Err(e)));
1418+
}
14071419
}
1408-
Err(e) => {
1409-
return Poll::Ready(Some(Err(e)));
1410-
}
1411-
},
1420+
}
14121421
Some(None) => {
14131422
// Input partition has finished sending batches
14141423
return Poll::Ready(None);
@@ -1436,12 +1445,15 @@ impl Stream for PerPartitionStream {
14361445
.spill_manager
14371446
.read_spill_as_stream(spill_file, None)?;
14381447
self.state =
1439-
RepartitionStreamState::ReadingSpilledBatch(new_stream);
1448+
RepartitionStreamState::ReadingSpilledBatch(
1449+
new_stream,
1450+
);
14401451
continue;
14411452
}
14421453
None => {
14431454
// No more spilled files, go back to receiving from channel
1444-
self.state = RepartitionStreamState::ReceivingFromChannel;
1455+
self.state =
1456+
RepartitionStreamState::ReceivingFromChannel;
14451457
continue;
14461458
}
14471459
}

docs/source/user-guide/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ The following configuration settings are available:
114114
| datafusion.execution.spill_compression | uncompressed | Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster (de)compression, but typically results in larger spill files. In contrast, zstd achieves higher compression ratios at the cost of slower (de)compression speed. |
115115
| datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). |
116116
| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. |
117+
| datafusion.execution.max_spill_file_size_bytes | 104857600 | Maximum size in bytes for individual spill files before rotating to a new file. When operators spill data to disk (e.g., RepartitionExec, SortExec), they write multiple batches to the same file until this size limit is reached, then rotate to a new file. This reduces syscall overhead compared to one-file-per-batch while preventing files from growing too large. A larger value reduces file creation overhead but may hold more disk space. A smaller value creates more files but allows finer-grained space reclamation (especially in LIFO mode where files are truncated after reading). Default: 100 MB |
117118
| datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics |
118119
| datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. |
119120
| datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max |

0 commit comments

Comments
 (0)