Skip to content

Commit af6b411

Browse files
committed
clippy
1 parent 569ecd5 commit af6b411

File tree

3 files changed

+42
-38
lines changed

3 files changed

+42
-38
lines changed

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 28 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,14 @@ type MaybeBatch = Option<Result<RepartitionBatch>>;
8282
type InputPartitionsToCurrentPartitionSender = Vec<DistributionSender<MaybeBatch>>;
8383
type InputPartitionsToCurrentPartitionReceiver = Vec<DistributionReceiver<MaybeBatch>>;
8484

85+
/// Output channel with its associated memory reservation and spill pool
86+
#[derive(Clone)]
87+
struct OutputChannel {
88+
sender: DistributionSender<MaybeBatch>,
89+
reservation: SharedMemoryReservation,
90+
spill_pool: Arc<Mutex<SpillPool>>,
91+
}
92+
8593
/// Channels and resources for a single output partition
8694
struct PartitionChannels {
8795
/// Senders for each input partition to send data to this output partition
@@ -242,12 +250,13 @@ impl RepartitionExecState {
242250
));
243251

244252
// Create SpillPool with configured max file size
245-
let max_file_size = context.session_config().options().execution.max_spill_file_size_bytes;
246-
let spill_pool = SpillPool::new(
247-
max_file_size,
248-
Arc::clone(&spill_manager),
249-
input.schema(),
250-
);
253+
let max_file_size = context
254+
.session_config()
255+
.options()
256+
.execution
257+
.max_spill_file_size_bytes;
258+
let spill_pool =
259+
SpillPool::new(max_file_size, Arc::clone(&spill_manager), input.schema());
251260

252261
channels.insert(
253262
partition,
@@ -270,11 +279,11 @@ impl RepartitionExecState {
270279
.map(|(partition, channels)| {
271280
(
272281
*partition,
273-
(
274-
channels.tx[i].clone(),
275-
Arc::clone(&channels.reservation),
276-
Arc::clone(&channels.spill_pool),
277-
),
282+
OutputChannel {
283+
sender: channels.tx[i].clone(),
284+
reservation: Arc::clone(&channels.reservation),
285+
spill_pool: Arc::clone(&channels.spill_pool),
286+
},
278287
)
279288
})
280289
.collect();
@@ -291,9 +300,7 @@ impl RepartitionExecState {
291300
let wait_for_task = SpawnedTask::spawn(RepartitionExec::wait_for_task(
292301
input_task,
293302
txs.into_iter()
294-
.map(|(partition, (tx, _reservation, _spill_manager))| {
295-
(partition, tx)
296-
})
303+
.map(|(partition, channel)| (partition, channel.sender))
297304
.collect(),
298305
));
299306
spawned_tasks.push(wait_for_task);
@@ -758,12 +765,7 @@ impl ExecutionPlan for RepartitionExec {
758765
.remove(&partition)
759766
.expect("partition not used yet");
760767

761-
(
762-
rx,
763-
reservation,
764-
spill_pool,
765-
Arc::clone(&state.abort_helper),
766-
)
768+
(rx, reservation, spill_pool, Arc::clone(&state.abort_helper))
767769
};
768770

769771
trace!(
@@ -1051,14 +1053,7 @@ impl RepartitionExec {
10511053
/// txs hold the output sending channels for each output partition
10521054
async fn pull_from_input(
10531055
mut stream: SendableRecordBatchStream,
1054-
mut output_channels: HashMap<
1055-
usize,
1056-
(
1057-
DistributionSender<MaybeBatch>,
1058-
SharedMemoryReservation,
1059-
Arc<Mutex<SpillPool>>,
1060-
),
1061-
>,
1056+
mut output_channels: HashMap<usize, OutputChannel>,
10621057
partitioning: Partitioning,
10631058
metrics: RepartitionMetrics,
10641059
) -> Result<()> {
@@ -1090,30 +1085,28 @@ impl RepartitionExec {
10901085

10911086
let timer = metrics.send_time[partition].timer();
10921087
// if there is still a receiver, send to it
1093-
if let Some((tx, reservation, spill_pool)) =
1094-
output_channels.get_mut(&partition)
1095-
{
1088+
if let Some(channel) = output_channels.get_mut(&partition) {
10961089
let (batch_to_send, is_memory_batch) =
1097-
match reservation.lock().try_grow(size) {
1090+
match channel.reservation.lock().try_grow(size) {
10981091
Ok(_) => {
10991092
// Memory available - send in-memory batch
11001093
(RepartitionBatch::Memory(batch), true)
11011094
}
11021095
Err(_) => {
11031096
// We're memory limited - spill to SpillPool
11041097
// SpillPool handles file handle reuse and rotation
1105-
spill_pool.lock().push_batch(&batch)?;
1098+
channel.spill_pool.lock().push_batch(&batch)?;
11061099

11071100
// Send marker indicating batch was spilled
11081101
(RepartitionBatch::Spilled, false)
11091102
}
11101103
};
11111104

1112-
if tx.send(Some(Ok(batch_to_send))).await.is_err() {
1105+
if channel.sender.send(Some(Ok(batch_to_send))).await.is_err() {
11131106
// If the other end has hung up, it was an early shutdown (e.g. LIMIT)
11141107
// Only shrink memory if it was a memory batch
11151108
if is_memory_batch {
1116-
reservation.lock().shrink(size);
1109+
channel.reservation.lock().shrink(size);
11171110
}
11181111
output_channels.remove(&partition);
11191112
}

datafusion/physical-plan/src/spill/spill_pool.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,12 +204,20 @@ impl SpillPool {
204204

205205
/// Returns the number of files currently in the pool
206206
pub fn file_count(&self) -> usize {
207-
self.files.len() + if self.current_write_file.is_some() { 1 } else { 0 }
207+
self.files.len()
208+
+ if self.current_write_file.is_some() {
209+
1
210+
} else {
211+
0
212+
}
208213
}
209214

210215
/// Returns the total number of unread batches across all files
211216
pub fn batch_count(&self) -> usize {
212-
self.files.iter().map(|f| f.remaining_batches()).sum::<usize>()
217+
self.files
218+
.iter()
219+
.map(|f| f.remaining_batches())
220+
.sum::<usize>()
213221
+ self.current_batch_count
214222
}
215223

@@ -385,7 +393,8 @@ mod tests {
385393
let env = Arc::new(RuntimeEnv::default());
386394
let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
387395
let schema = create_test_schema();
388-
let spill_manager = Arc::new(SpillManager::new(env, metrics, Arc::clone(&schema)));
396+
let spill_manager =
397+
Arc::new(SpillManager::new(env, metrics, Arc::clone(&schema)));
389398

390399
SpillPool::new(max_file_size, spill_manager, schema)
391400
}

datafusion/sqllogictest/test_files/information_schema.slt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ datafusion.execution.keep_partition_by_columns false
223223
datafusion.execution.listing_table_factory_infer_partitions true
224224
datafusion.execution.listing_table_ignore_subdirectory true
225225
datafusion.execution.max_buffered_batches_per_output_file 2
226+
datafusion.execution.max_spill_file_size_bytes 104857600
226227
datafusion.execution.meta_fetch_concurrency 32
227228
datafusion.execution.minimum_parallel_output_files 4
228229
datafusion.execution.objectstore_writer_buffer_size 10485760
@@ -343,6 +344,7 @@ datafusion.execution.keep_partition_by_columns false Should DataFusion keep the
343344
datafusion.execution.listing_table_factory_infer_partitions true Should a `ListingTable` created through the `ListingTableFactory` infer table partitions from Hive compliant directories. Defaults to true (partition columns are inferred and will be represented in the table schema).
344345
datafusion.execution.listing_table_ignore_subdirectory true Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`).
345346
datafusion.execution.max_buffered_batches_per_output_file 2 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption
347+
datafusion.execution.max_spill_file_size_bytes 104857600 Maximum size in bytes for individual spill files before rotating to a new file. When operators spill data to disk (e.g., RepartitionExec, SortExec), they write multiple batches to the same file until this size limit is reached, then rotate to a new file. This reduces syscall overhead compared to one-file-per-batch while preventing files from growing too large. A larger value reduces file creation overhead but may hold more disk space. A smaller value creates more files but allows finer-grained space reclamation (especially in LIFO mode where files are truncated after reading). Default: 100 MB
346348
datafusion.execution.meta_fetch_concurrency 32 Number of files to read in parallel when inferring schema and statistics
347349
datafusion.execution.minimum_parallel_output_files 4 Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached.
348350
datafusion.execution.objectstore_writer_buffer_size 10485760 Size (bytes) of data buffer DataFusion uses when writing output files. This affects the size of the data chunks that are uploaded to remote object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being written, it may be necessary to increase this size to avoid errors from the remote end point.

0 commit comments

Comments
 (0)