-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Add a SpillingPool to manage collections of spill files #18207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Marking as draft for now. Open to input but needs a bit more work. I'm still familiarizing myself with the spilling infrastructure. |
|
This PR is setting size limit to spill files, when the size exceeds threshold, the spiller rotates to new file. I'm wondering why this design? Now the spill writer and reader is able to do streaming read/write, so a large spill file usually won't be the issue, unless it needs more parallelism somewhere. |
The issue with using a single FIFO file is that you accumulate dead data, bloating disk usage considerably. The idea is to cap that at say 100MB and then start a new file so that once all of the original file has been consumed we can garbage collect it. |
f7c84fe to
c5b40ee
Compare
|
@2010YOUY01 let me know if that makes sense, there's an example of this issue in #18011 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces a SpillPool abstraction to centralize the management of spill files with FIFO semantics. The pool handles file rotation, batching multiple record batches into single files up to a configurable size limit, and provides streaming read access to spilled data.
Key changes:
- Adds a new
SpillPoolmodule with FIFO queue semantics for managing spill files - Integrates
SpillPoolintoRepartitionExecto replace the previous one-file-per-batch approach - Adds a new configuration option
max_spill_file_size_bytes(default 100MB) to control when spill files rotate
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| datafusion/physical-plan/src/spill/spill_pool.rs | New module implementing SpillPool and SpillPoolStream with comprehensive tests |
| datafusion/physical-plan/src/spill/mod.rs | Exports the new spill_pool module |
| datafusion/physical-plan/src/repartition/mod.rs | Refactored to use SpillPool instead of one-file-per-batch spilling |
| datafusion/common/src/config.rs | Adds max_spill_file_size_bytes configuration option |
| docs/source/user-guide/configs.md | Documents the new max_spill_file_size_bytes configuration |
| datafusion/sqllogictest/test_files/information_schema.slt | Updates test expectations to include new configuration option |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
This makes a lot of sense, operators should release disk usage sooner if possible. I will to review it soon. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated no new comments.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, looks good in general.
I think it needs several additional test coverage:
- e2e tests, potentially a query that triggers spilling in
RepartitionExec. I think we can also do a quick benchmark on it to see how things work. - #18207 (comment)
I also left some suggestions to simplify the implementation, but they're optional.
datafusion/common/src/config.rs
Outdated
| /// | ||
| /// A larger value reduces file creation overhead but may hold more disk space. | ||
| /// A smaller value creates more files but allows finer-grained space reclamation | ||
| /// (especially in LIFO mode where files are truncated after reading). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now we're reclaiming disk space in the 'chunked file' granularity, perhaps this truncating way don't have to be mentioned, since it don't have a real usage yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep was leftover from a previous implementation
| /// Size of current write file in bytes (estimated) | ||
| current_write_size: usize, | ||
| /// Number of batches written to current file | ||
| current_batch_count: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can track them inside InProgressSpillFile, and expose an API. This approach can simplify SpillPool a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| /// SpillManager for creating files and tracking metrics | ||
| spill_manager: Arc<SpillManager>, | ||
| /// Schema for batches (used by SpillPoolStream to implement RecordBatchStream) | ||
| schema: SchemaRef, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid duplication, the schema inside spill_manager can be used instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍🏻 done
| /// Shared reference to the spill pool | ||
| spill_pool: Arc<Mutex<SpillPool>>, | ||
| /// SpillManager for creating streams from spill files | ||
| spill_manager: Arc<SpillManager>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to use the spill_manager inside spill_pool, and eliminate this field?
| // Input finished and no more spilled data - we're done | ||
| return Poll::Ready(None); | ||
| } | ||
| // Otherwise check the channel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PerPartitionStream is for the order-preserving case of RepartitionExec, it seems a bit tricky to get the order correct, I recommend to find the existing tests for order-preserving repartition, and include spilling to it.
|
|
||
| // Append batch to current file | ||
| if let Some(ref mut file) = self.current_write_file { | ||
| file.append_batch(batch)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A potential follow-up to do: #18261
e7c1e6e to
c57fbd8
Compare
|
|
||
| # End repartition on empty columns test | ||
|
|
||
| # Start spilling tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test passes on main but fails on datafusion-cli v50:
❯ datafusion-cli
DataFusion CLI v50.0.0
> CREATE UNBOUNDED EXTERNAL TABLE annotated_data_infinite2 (
a0 INTEGER,
a INTEGER,
b INTEGER,
c INTEGER,
d INTEGER
)
STORED AS CSV
WITH ORDER (a ASC, b ASC, c ASC)
LOCATION 'datafusion/core/tests/data/window_2.csv'
OPTIONS ('format.has_header' 'true');
SET datafusion.runtime.memory_limit = '12K';
EXPLAIN ANALYZE
SELECT SUM(a) OVER(partition by a, b order by c) as sum1,
SUM(a) OVER(partition by b, a order by c) as sum2,
SUM(a) OVER(partition by a, d order by b) as sum3,
SUM(a) OVER(partition by d order by a) as sum4
FROM annotated_data_infinite2;
0 row(s) fetched.
Elapsed 0.026 seconds.
0 row(s) fetched.
Elapsed 0.001 seconds.
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
RepartitionExec[Merge 11]#85(can spill: false) consumed 1896.0 B, peak 1896.0 B,
RepartitionExec[Merge 5]#73(can spill: false) consumed 1448.0 B, peak 1448.0 B,
RepartitionExec[Merge 3]#59(can spill: false) consumed 1384.0 B, peak 1384.0 B,
RepartitionExec[Merge 1]#56(can spill: false) consumed 1304.0 B, peak 1304.0 B,
RepartitionExec[8]#48(can spill: false) consumed 1216.0 B, peak 1856.0 B.
Error: Failed to allocate additional 240.0 B for RepartitionExec[Merge 6] with 0.0 B already allocated for this reservation - 8.0 B remain available for the total pool
It's not this PR that enabled it to pass, it was #18014, but worth adding anyway.
443b0b9 to
d7af8ef
Compare
|
@2010YOUY01 this is ready for review now. There are existing unit style tests that manually push data through some partitions but not others and demonstrate that if there were some imbalances the spilling we've added to RepartitionExec would allow the query to continue instead of failing. I added an integration style test that triggered this with a real world query (one that was already used in other SLT tests actually) but I had to remove it because the order was not deterministic and I could not find a way to make the order deterministic and get spilling. However it should be reproducible outside of this change if you'd want. One thing I'll note about this change specifically and the existing spilling infrastructure that was a bit disappointing: it seems like it's not possible to read and write from the same spill file. That is, we need to finish writing a spill file before we can read from it. This means that the read side needs wait for a spill file to be full or for the stream to end before it can start reading, which is bound to add latency. I don't think that is a limitation of this change, rather a limitation of the existing Overall I'm not 100% certain what positive impact this will have in the real world, it's very hard to trigger specific memory scenarios in a real query. However I think this is an improvement on the per-batch spilling that we added in #18014 which was in turn an improvement on the query failing. I also think the structures added here are pretty isolated but also maybe re-usable and nicely packaged up. They'd be easy to use in other operators if they prove to work well or easy to rip out of RepartitionExec if they don't. So there is little harm in adding them. I see this as part of the strategy of "get as many operators to implement spilling as possible", which is complementary to other interesting big picture discussions around memory management. Personally I would like to close the chapter on RepartitionExec and move onto thinking about those other big picture bits. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you!
One thing I'll note about this change specifically and the existing spilling infrastructure that was a bit disappointing: it seems like it's not possible to read and write from the same spill file. That is, we need to finish writing a spill file before we can read from it. This means that the read side needs wait for a spill file to be full or for the stream to end before it can start reading, which is bound to add latency. I don't think that is a limitation of this change, rather a limitation of the existing
InProgressSpillFile, etc. I decided not to fight it any more but if you think this interpretation is wrong and it is possible to use an active file as a FIFO read/write queue I'm open to exploring more.Overall I'm not 100% certain what positive impact this will have in the real world, it's very hard to trigger specific memory scenarios in a real query. However I think this is an improvement on the per-batch spilling that we added in #18014 which was in turn an improvement on the query failing. I also think the structures added here are pretty isolated but also maybe re-usable and nicely packaged up. They'd be easy to use in other operators if they prove to work well or easy to rip out of RepartitionExec if they don't. So there is little harm in adding them. I see this as part of the strategy of "get as many operators to implement spilling as possible", which is complementary to other interesting big picture discussions around memory management. Personally I would like to close the chapter on RepartitionExec and move onto thinking about those other big picture bits.
Now the spilling operators are mostly using external sort based solution, so their logic follows a 'first spill stage, then write stage' pattern, and the existing utilities are designed for it, so they might now be not easy to use for the interleaving writes and reads pattern. But I think the solution in this PR is good enough and we won't need further changes.
Also actually I still don't fully get why can't let RepartitionExec run with a bounded buffer using some backpressure mechanism. Maybe this can be a long term solution to explore.
I think this SpillingPool with rotating spill file feature is quite useful to external sort based operators, it makes the disk usage more efficient if the query includes multiple stages of spilling executor, or maybe multiple query instances are sharing the same disk. However I think they're not the top priority now, we might want to keep the solution simple and work on the stability issues first.
Overall I think it's good idea to close the chapter on RepartitionExec. Thanks again for the amazing work!
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| //! SpillPool: A reusable abstraction for managing spill files with FIFO semantics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doc is great! Maybe we can merge this file-level comment into the comment on struct SpillPool? It seems there are some duplicates.
| //! - **Sequential reading**: Uses IPC Stream format's natural sequential access pattern | ||
| //! - **Automatic cleanup**: Files are deleted once fully consumed | ||
| //! | ||
| //! # Usage Example |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to also explain that the reader is allowed to poll before the writer finishes writing, and here in the example we can let the writer writing concurrently with the reader.
Co-authored-by: Yongting You <[email protected]>
Co-authored-by: Yongting You <[email protected]>
|
@2010YOUY01 I haven't merged this because I dove in to do another pass and found various issues / bugs. I'm working on a reworked version which has nicer APIs, fixes the bugs and manages to enable reading of a spill file as it's being written (making sure there is no latency hit for spilling other than the IO cost). I'll ping back once I think this is ready for another review (sadly I think that will be warranted). Thank you so much for your review and patience 🙏🏻 |
|
Okay @2010YOUY01 this is ready for review 😄 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Copilot <[email protected]>
| /// - **Memory management**: Files automatically rotate based on size limits | ||
| /// - **Concurrent I/O**: Reader and writer operate independently with async coordination | ||
| /// - **FIFO semantics**: Batches are consumed in the exact order they were written | ||
| pub fn channel( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct me if I'm not understand it correctly: this is a SPSC channel, and the writer and the reader can operator concurrently inside the same in progress file, like the following timeline:
- writer write batch
B0to F1 - writer write batch
B1to F1 - reader read
B0 - reader read
B1, no more batch to read -> wait on the waker - writer write batch
B2and finishF1then continue writing to a new file, wake up the waiting reader, then writeB3toF2 - reader wake up and read
B2, then dropF1and release the resources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes SPSC channel is the right terminology and given the new API that is very channel like I think I can update the docs to reflect that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /// # Architecture | ||
| /// | ||
| /// ```text | ||
| /// ┌─────────────────────────────────────────────────────────────────────────┐ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This figure looks 👍🏼 , are they AI-generated? I’ve tried generating ASCII arts with ChatGPT several times, but all attempts failed, so I still draw them manually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A mix. Sonnet 4.5 always gets the formatting slightly wrong but I still find it easier t have it generate them and review / fix them up than to draw by hand in some cases. Sometimes it fails miserably and you have to do it yourself...
|
@2010YOUY01 just wanted to check, did you want me to leave this up for review and approve again or is the review in #18207 (review) plus the existing approval a sign to merge it? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have went through another review pass, I think it's good to go after the disk usage counting issue is fixed. Thanks again!
| Self { | ||
| parent_temp_dir: Arc::clone(&self.parent_temp_dir), | ||
| tempfile: Arc::clone(&self.tempfile), | ||
| current_file_disk_usage: self.current_file_disk_usage, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we allow clone, the disk usage tracking seems to be a issue. See below update_disk_usage() and Drop
| let input_partitions = vec![partition]; | ||
|
|
||
| // Use RoundRobinBatch to ensure predictable ordering | ||
| let partitioning = Partitioning::RoundRobinBatch(2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also test Partitioning::Hash
| /// configured in [`channel`]. When dropped, it finalizes the current file so readers | ||
| /// can access all written data. | ||
| pub struct SpillPoolWriter { | ||
| /// Maximum size in bytes before rotating to a new file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /// Maximum size in bytes before rotating to a new file | |
| /// Maximum size in bytes before rotating to a new file. | |
| /// Typically set from configuration `datafusion.execution.max_spill_file_size_bytes`. |
| /// | ||
| /// This is the recommended way to create a spill pool. The writer has exclusive | ||
| /// write access, and the reader can consume batches in FIFO order. The reader | ||
| /// can start reading immediately while the writer continues to write more data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /// can start reading immediately while the writer continues to write more data. | |
| /// can start reading immediately after the writer appends a batch to the spill file | |
| /// , without waiting for the file to be sealed, while the writer continues to write | |
| /// additional data. |
| /// Whether the writer has finished writing to this file | ||
| writer_finished: bool, | ||
| /// Wakers for readers waiting on this specific file | ||
| wakers: Vec<Waker>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is only intended to have a single reader, do we still need a Vec?
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_reader_catches_up_to_writer() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just want to point out an 'unsafe' implementation here, other reviewers might want to double-check it.
Tailing (concurrent reader and writer on the same file) within the same IPC Stream file should be a risky operation, I think it's not allowed from the spec https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
But we have external coordination to ensure
- Batch level mutex: reader won't see half-written records
- Reader won't call
reader.next()if it has already reached the last batch, and the writer plans to append more batches later, here is a example:
# 1 reader and 1 writer operating concurrently on the same file
T0: writer append batch `B0` to spill file `F0`
T1: reader calls `.next()` to read `B0`, and we have coordination to ensure it won't call `.next()` again before the writer appends more batches, otherwise I think it can trigger some error.
T2: writer append `B1`
...
Also this test has triggered the edge case, so I think the implementation is good.
| } | ||
| } | ||
|
|
||
| #[cfg(test)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love the test coverage 💯
Addresses #18014 (comment), potentially paves the path to solve #18011 for other operators as well