1717
1818//! Defines the spilling functions
1919
20+ pub ( crate ) mod in_progress_spill_file;
21+ pub ( crate ) mod spill_manager;
22+
2023use std:: fs:: File ;
2124use std:: io:: BufReader ;
2225use std:: path:: { Path , PathBuf } ;
2326use std:: ptr:: NonNull ;
24- use std:: sync:: Arc ;
2527
2628use arrow:: array:: ArrayData ;
2729use arrow:: datatypes:: { Schema , SchemaRef } ;
2830use arrow:: ipc:: { reader:: StreamReader , writer:: StreamWriter } ;
2931use arrow:: record_batch:: RecordBatch ;
30- use datafusion_execution:: runtime_env:: RuntimeEnv ;
3132use log:: debug;
3233use tokio:: sync:: mpsc:: Sender ;
3334
@@ -36,7 +37,6 @@ use datafusion_execution::disk_manager::RefCountedTempFile;
3637use datafusion_execution:: memory_pool:: human_readable_size;
3738use datafusion_execution:: SendableRecordBatchStream ;
3839
39- use crate :: metrics:: SpillMetrics ;
4040use crate :: stream:: RecordBatchReceiverStream ;
4141
4242/// Read spilled batches from the disk
@@ -229,182 +229,21 @@ impl IPCStreamWriter {
229229 }
230230}
231231
232- /// The `SpillManager` is responsible for the following tasks:
233- /// - Reading and writing `RecordBatch`es to raw files based on the provided configurations.
234- /// - Updating the associated metrics.
235- ///
236- /// Note: The caller (external operators such as `SortExec`) is responsible for interpreting the spilled files.
237- /// For example, all records within the same spill file are ordered according to a specific order.
238- #[ derive( Debug , Clone ) ]
239- pub ( crate ) struct SpillManager {
240- env : Arc < RuntimeEnv > ,
241- metrics : SpillMetrics ,
242- schema : SchemaRef ,
243- /// Number of batches to buffer in memory during disk reads
244- batch_read_buffer_capacity : usize ,
245- // TODO: Add general-purpose compression options
246- }
247-
248- impl SpillManager {
249- pub fn new ( env : Arc < RuntimeEnv > , metrics : SpillMetrics , schema : SchemaRef ) -> Self {
250- Self {
251- env,
252- metrics,
253- schema,
254- batch_read_buffer_capacity : 2 ,
255- }
256- }
257-
258- /// Creates a temporary file for in-progress operations, returning an error
259- /// message if file creation fails. The file can be used to append batches
260- /// incrementally and then finish the file when done.
261- pub fn create_in_progress_file (
262- & self ,
263- request_msg : & str ,
264- ) -> Result < InProgressSpillFile > {
265- let temp_file = self . env . disk_manager . create_tmp_file ( request_msg) ?;
266- Ok ( InProgressSpillFile :: new ( Arc :: new ( self . clone ( ) ) , temp_file) )
267- }
268-
269- /// Spill input `batches` into a single file in a atomic operation. If it is
270- /// intended to incrementally write in-memory batches into the same spill file,
271- /// use [`Self::create_in_progress_file`] instead.
272- /// None is returned if no batches are spilled.
273- #[ allow( dead_code) ] // TODO: remove after change SMJ to use SpillManager
274- pub fn spill_record_batch_and_finish (
275- & self ,
276- batches : & [ RecordBatch ] ,
277- request_msg : & str ,
278- ) -> Result < Option < RefCountedTempFile > > {
279- let mut in_progress_file = self . create_in_progress_file ( request_msg) ?;
280-
281- for batch in batches {
282- in_progress_file. append_batch ( batch) ?;
283- }
284-
285- in_progress_file. finish ( )
286- }
287-
288- /// Refer to the documentation for [`Self::spill_record_batch_and_finish`]. This method
289- /// additionally spills the `RecordBatch` into smaller batches, divided by `row_limit`.
290- #[ allow( dead_code) ] // TODO: remove after change aggregate to use SpillManager
291- pub fn spill_record_batch_by_size (
292- & self ,
293- batch : & RecordBatch ,
294- request_description : & str ,
295- row_limit : usize ,
296- ) -> Result < Option < RefCountedTempFile > > {
297- let total_rows = batch. num_rows ( ) ;
298- let mut batches = Vec :: new ( ) ;
299- let mut offset = 0 ;
300-
301- // It's ok to calculate all slices first, because slicing is zero-copy.
302- while offset < total_rows {
303- let length = std:: cmp:: min ( total_rows - offset, row_limit) ;
304- let sliced_batch = batch. slice ( offset, length) ;
305- batches. push ( sliced_batch) ;
306- offset += length;
307- }
308-
309- // Spill the sliced batches to disk
310- self . spill_record_batch_and_finish ( & batches, request_description)
311- }
312-
313- /// Reads a spill file as a stream. The file must be created by the current `SpillManager`.
314- /// This method will generate output in FIFO order: the batch appended first
315- /// will be read first.
316- pub fn read_spill_as_stream (
317- & self ,
318- spill_file_path : RefCountedTempFile ,
319- ) -> Result < SendableRecordBatchStream > {
320- let mut builder = RecordBatchReceiverStream :: builder (
321- Arc :: clone ( & self . schema ) ,
322- self . batch_read_buffer_capacity ,
323- ) ;
324- let sender = builder. tx ( ) ;
325-
326- builder. spawn_blocking ( move || read_spill ( sender, spill_file_path. path ( ) ) ) ;
327-
328- Ok ( builder. build ( ) )
329- }
330- }
331-
332- /// Represents an in-progress spill file used for writing `RecordBatch`es to disk, created by `SpillManager`.
333- /// Caller is able to use this struct to incrementally append in-memory batches to
334- /// the file, and then finalize the file by calling the `finish` method.
335- pub ( crate ) struct InProgressSpillFile {
336- spill_writer : Arc < SpillManager > ,
337- /// Lazily initialized writer
338- writer : Option < IPCStreamWriter > ,
339- /// Lazily initialized in-progress file, it will be moved out when the `finish` method is invoked
340- in_progress_file : Option < RefCountedTempFile > ,
341- }
342-
343- impl InProgressSpillFile {
344- pub fn new (
345- spill_writer : Arc < SpillManager > ,
346- in_progress_file : RefCountedTempFile ,
347- ) -> Self {
348- Self {
349- spill_writer,
350- in_progress_file : Some ( in_progress_file) ,
351- writer : None ,
352- }
353- }
354-
355- /// Appends a `RecordBatch` to the file, initializing the writer if necessary.
356- pub fn append_batch ( & mut self , batch : & RecordBatch ) -> Result < ( ) > {
357- if self . in_progress_file . is_none ( ) {
358- return Err ( exec_datafusion_err ! (
359- "Append operation failed: No active in-progress file. The file may have already been finalized."
360- ) ) ;
361- }
362- if self . writer . is_none ( ) {
363- let schema = batch. schema ( ) ;
364- if let Some ( ref in_progress_file) = self . in_progress_file {
365- self . writer = Some ( IPCStreamWriter :: new (
366- in_progress_file. path ( ) ,
367- schema. as_ref ( ) ,
368- ) ?) ;
369-
370- // Update metrics
371- self . spill_writer . metrics . spill_file_count . add ( 1 ) ;
372- }
373- }
374- if let Some ( writer) = & mut self . writer {
375- let ( spilled_rows, spilled_bytes) = writer. write ( batch) ?;
376-
377- // Update metrics
378- self . spill_writer . metrics . spilled_bytes . add ( spilled_bytes) ;
379- self . spill_writer . metrics . spilled_rows . add ( spilled_rows) ;
380- }
381- Ok ( ( ) )
382- }
383-
384- /// Finalizes the file, returning the completed file reference.
385- /// If there are no batches spilled before, it returns `None`.
386- pub fn finish ( & mut self ) -> Result < Option < RefCountedTempFile > > {
387- if let Some ( writer) = & mut self . writer {
388- writer. finish ( ) ?;
389- } else {
390- return Ok ( None ) ;
391- }
392-
393- Ok ( self . in_progress_file . take ( ) )
394- }
395- }
396-
397232#[ cfg( test) ]
398233mod tests {
234+ use super :: in_progress_spill_file:: InProgressSpillFile ;
399235 use super :: * ;
400236 use crate :: common:: collect;
401237 use crate :: metrics:: ExecutionPlanMetricsSet ;
238+ use crate :: metrics:: SpillMetrics ;
239+ use crate :: spill:: spill_manager:: SpillManager ;
402240 use crate :: test:: build_table_i32;
403241 use arrow:: array:: { Float64Array , Int32Array , ListArray , StringArray } ;
404242 use arrow:: compute:: cast;
405243 use arrow:: datatypes:: { DataType , Field , Int32Type , Schema } ;
406244 use arrow:: record_batch:: RecordBatch ;
407245 use datafusion_common:: Result ;
246+ use datafusion_execution:: runtime_env:: RuntimeEnv ;
408247
409248 use std:: sync:: Arc ;
410249
0 commit comments