Skip to content
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
69ab5d2
implement SpillingPool
adriangb Oct 21, 2025
2c9bb58
clippy
adriangb Oct 21, 2025
bfabc5e
use buffered stream
adriangb Oct 21, 2025
f183b9e
lint
adriangb Oct 21, 2025
0cc7a63
rework
adriangb Oct 21, 2025
6dd6b6d
Add some tests
adriangb Oct 21, 2025
b25bdfa
fix lints
adriangb Oct 22, 2025
03ffc3c
bugfix
adriangb Oct 22, 2025
966a213
address pr feedback
adriangb Oct 22, 2025
1dbc730
fix build
adriangb Oct 23, 2025
e2ed52d
fix dropped task
adriangb Oct 23, 2025
c850240
updarte docstrings
adriangb Oct 23, 2025
303258e
remove wrapper struct
adriangb Oct 23, 2025
ab7f350
hide stream behind a trait to avoid making more public stuff
adriangb Oct 23, 2025
9033e84
wip on cleanup
adriangb Oct 24, 2025
cccbd73
add a detailed test
adriangb Oct 24, 2025
f43bbb6
Add slt test
adriangb Oct 24, 2025
5bf14ec
use 128MB as the default spill file size
adriangb Oct 24, 2025
39577f0
make SpillPool pub(crate)
adriangb Oct 24, 2025
468ad57
fmt
adriangb Oct 24, 2025
89dc51c
lint
adriangb Oct 24, 2025
54652ef
update slt
adriangb Oct 24, 2025
d7af8ef
remove test
adriangb Oct 30, 2025
0f6c83a
clippy
adriangb Oct 30, 2025
93779c0
Update datafusion/physical-plan/src/spill/spill_pool.rs
adriangb Nov 2, 2025
192f3ef
update docs
adriangb Nov 2, 2025
374319b
Apply suggestion from @2010YOUY01
adriangb Nov 2, 2025
b091574
lints
adriangb Nov 4, 2025
c34a5df
Add state machine for stream polling
adriangb Nov 4, 2025
e9aedf6
wip
adriangb Nov 5, 2025
30bcfc9
rewrite spilling pool, incorporate new architecture
adriangb Nov 5, 2025
df58e8b
re-incorporate gated channels
adriangb Nov 5, 2025
66790ec
add lots of docs
adriangb Nov 5, 2025
3f40fdb
add metrics for file and byte counts to tests
adriangb Nov 5, 2025
f61148a
Update datafusion/physical-plan/src/repartition/mod.rs
adriangb Nov 5, 2025
2dcf3f5
Update datafusion/physical-plan/src/repartition/mod.rs
adriangb Nov 5, 2025
e31f827
clippy
adriangb Nov 5, 2025
8cba32e
tweak docstrings
adriangb Nov 6, 2025
f1de9da
fmt
adriangb Nov 6, 2025
d05b0cf
fix and track disk usage
adriangb Nov 7, 2025
23ebd30
clean up test
adriangb Nov 7, 2025
34eb564
fix docs
adriangb Nov 7, 2025
195f72f
use a single waker
adriangb Nov 7, 2025
86364b2
fix test, make MPSC, docs, cleanup
adriangb Nov 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,23 @@ config_namespace! {
/// batches and merged.
pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024

/// Maximum size in bytes for individual spill files before rotating to a new file.
///
/// When operators spill data to disk (e.g., RepartitionExec), they write
/// multiple batches to the same file until this size limit is reached, then rotate
/// to a new file. This reduces syscall overhead compared to one-file-per-batch
/// while preventing files from growing too large.
///
/// A larger value reduces file creation overhead but may hold more disk space.
/// A smaller value creates more files but allows finer-grained space reclamation
/// as files can be deleted once fully consumed.
///
/// Now only `RepartitionExec` supports this spill file rotation feature, other spilling operators
/// may create spill files larger than the limit.
///
/// Default: 128 MB
pub max_spill_file_size_bytes: usize, default = 128 * 1024 * 1024

/// Number of files to read in parallel when inferring schema and statistics
pub meta_fetch_concurrency: usize, default = 32

Expand Down
37 changes: 30 additions & 7 deletions datafusion/execution/src/disk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,12 @@ impl DiskManager {

let dir_index = rng().random_range(0..local_dirs.len());
Ok(RefCountedTempFile {
_parent_temp_dir: Arc::clone(&local_dirs[dir_index]),
tempfile: Builder::new()
.tempfile_in(local_dirs[dir_index].as_ref())
.map_err(DataFusionError::IoError)?,
parent_temp_dir: Arc::clone(&local_dirs[dir_index]),
tempfile: Arc::new(
Builder::new()
.tempfile_in(local_dirs[dir_index].as_ref())
.map_err(DataFusionError::IoError)?,
),
current_file_disk_usage: 0,
disk_manager: Arc::clone(self),
})
Expand All @@ -311,26 +313,47 @@ impl DiskManager {
/// must invoke [`Self::update_disk_usage`] to update the global disk usage counter.
/// This ensures the disk manager can properly enforce usage limits configured by
/// [`DiskManager::with_max_temp_directory_size`].
///
/// This type is Clone-able, allowing multiple references to the same underlying file.
/// The file is deleted only when the last reference is dropped.
///
/// The parent temporary directory is also kept alive as long as any reference to
/// this file exists, preventing premature cleanup of the directory.
///
/// Once all references to this file are dropped, the file is deleted, and the
/// disk usage is subtracted from the disk manager's total.
#[derive(Debug)]
pub struct RefCountedTempFile {
/// The reference to the directory in which temporary files are created to ensure
/// it is not cleaned up prior to the NamedTempFile
_parent_temp_dir: Arc<TempDir>,
tempfile: NamedTempFile,
parent_temp_dir: Arc<TempDir>,
/// The underlying temporary file, wrapped in Arc to allow cloning
tempfile: Arc<NamedTempFile>,
/// Tracks the current disk usage of this temporary file. See
/// [`Self::update_disk_usage`] for more details.
current_file_disk_usage: u64,
/// The disk manager that created and manages this temporary file
disk_manager: Arc<DiskManager>,
}

impl Clone for RefCountedTempFile {
fn clone(&self) -> Self {
Self {
parent_temp_dir: Arc::clone(&self.parent_temp_dir),
tempfile: Arc::clone(&self.tempfile),
current_file_disk_usage: self.current_file_disk_usage,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we allow clone, the disk usage tracking seems to be a issue. See below update_disk_usage() and Drop

disk_manager: Arc::clone(&self.disk_manager),
}
}
}

impl RefCountedTempFile {
pub fn path(&self) -> &Path {
self.tempfile.path()
}

pub fn inner(&self) -> &NamedTempFile {
&self.tempfile
self.tempfile.as_ref()
}

/// Updates the global disk usage counter after modifications to the underlying file.
Expand Down
Loading