Skip to content

Commit ecb5d92

Browse files
committed
use buffered stream
1 parent af6b411 commit ecb5d92

File tree

2 files changed

+131
-12
lines changed

2 files changed

+131
-12
lines changed

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 110 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ struct PartitionChannels {
101101
/// SpillPool for batched spilling with file handle reuse (FIFO semantics)
102102
/// Wrapped in Arc so it can be shared between input tasks and output streams
103103
spill_pool: Arc<Mutex<SpillPool>>,
104+
/// SpillManager for creating streams from spill files
105+
spill_manager: Arc<SpillManager>,
104106
}
105107

106108
struct ConsumingInputStreamsState {
@@ -265,6 +267,7 @@ impl RepartitionExecState {
265267
rx,
266268
reservation,
267269
spill_pool: Arc::new(Mutex::new(spill_pool)),
270+
spill_manager,
268271
},
269272
);
270273
}
@@ -741,7 +744,7 @@ impl ExecutionPlan for RepartitionExec {
741744
let num_input_partitions = input.output_partitioning().partition_count();
742745

743746
// lock scope
744-
let (mut rx, reservation, spill_pool, abort_helper) = {
747+
let (mut rx, reservation, spill_pool, spill_manager, abort_helper) = {
745748
// lock mutexes
746749
let mut state = state.lock();
747750
let state = state.consume_input_streams(
@@ -759,13 +762,20 @@ impl ExecutionPlan for RepartitionExec {
759762
rx,
760763
reservation,
761764
spill_pool,
765+
spill_manager,
762766
..
763767
} = state
764768
.channels
765769
.remove(&partition)
766770
.expect("partition not used yet");
767771

768-
(rx, reservation, spill_pool, Arc::clone(&state.abort_helper))
772+
(
773+
rx,
774+
reservation,
775+
spill_pool,
776+
spill_manager,
777+
Arc::clone(&state.abort_helper),
778+
)
769779
};
770780

771781
trace!(
@@ -784,6 +794,7 @@ impl ExecutionPlan for RepartitionExec {
784794
_drop_helper: Arc::clone(&abort_helper),
785795
reservation: Arc::clone(&reservation),
786796
spill_pool: Arc::clone(&spill_pool),
797+
spill_manager: Arc::clone(&spill_manager),
787798
state: RepartitionStreamState::ReceivingFromChannel,
788799
}) as SendableRecordBatchStream
789800
})
@@ -814,6 +825,7 @@ impl ExecutionPlan for RepartitionExec {
814825
_drop_helper: abort_helper,
815826
reservation,
816827
spill_pool,
828+
spill_manager,
817829
state: RepartitionStreamState::ReceivingFromChannel,
818830
}) as SendableRecordBatchStream)
819831
}
@@ -1191,6 +1203,8 @@ impl RepartitionExec {
11911203
enum RepartitionStreamState {
11921204
/// Waiting for next item from channel
11931205
ReceivingFromChannel,
1206+
/// Reading a spilled batch from disk via SpillReaderStream (spawned blocking tasks)
1207+
ReadingSpilledBatch(SendableRecordBatchStream),
11941208
}
11951209

11961210
struct RepartitionStream {
@@ -1215,6 +1229,9 @@ struct RepartitionStream {
12151229
/// SpillPool for batched spilling with FIFO semantics
12161230
spill_pool: Arc<Mutex<SpillPool>>,
12171231

1232+
/// SpillManager for creating streams from spill files
1233+
spill_manager: Arc<SpillManager>,
1234+
12181235
/// Current state of the stream
12191236
state: RepartitionStreamState,
12201237
}
@@ -1240,13 +1257,20 @@ impl Stream for RepartitionStream {
12401257
return Poll::Ready(Some(Ok(batch)));
12411258
}
12421259
Ok(RepartitionBatch::Spilled) => {
1243-
// Read from SpillPool (FIFO order)
1244-
match self.spill_pool.lock().pop_batch()? {
1245-
Some(batch) => {
1246-
return Poll::Ready(Some(Ok(batch)));
1260+
// Get next file from SpillPool and create a stream
1261+
let next_file = self.spill_pool.lock().take_next_file()?;
1262+
match next_file {
1263+
Some(spill_file) => {
1264+
// Create stream using SpillReaderStream + spawn_buffered
1265+
let stream = self
1266+
.spill_manager
1267+
.read_spill_as_stream(spill_file, None)?;
1268+
self.state =
1269+
RepartitionStreamState::ReadingSpilledBatch(stream);
1270+
continue;
12471271
}
12481272
None => {
1249-
// No spilled batches available, continue receiving
1273+
// No spilled files available, continue receiving from channel
12501274
continue;
12511275
}
12521276
}
@@ -1273,6 +1297,38 @@ impl Stream for RepartitionStream {
12731297
}
12741298
}
12751299
}
1300+
RepartitionStreamState::ReadingSpilledBatch(stream) => {
1301+
match futures::ready!(stream.poll_next_unpin(cx)) {
1302+
Some(Ok(batch)) => {
1303+
// Return batch and stay in ReadingSpilledBatch state
1304+
return Poll::Ready(Some(Ok(batch)));
1305+
}
1306+
Some(Err(e)) => {
1307+
// Error reading spilled batch
1308+
return Poll::Ready(Some(Err(e)));
1309+
}
1310+
None => {
1311+
// Current spill file exhausted, check if there are more
1312+
let next_file = self.spill_pool.lock().take_next_file()?;
1313+
match next_file {
1314+
Some(spill_file) => {
1315+
// Create stream for next file
1316+
let new_stream = self
1317+
.spill_manager
1318+
.read_spill_as_stream(spill_file, None)?;
1319+
self.state =
1320+
RepartitionStreamState::ReadingSpilledBatch(new_stream);
1321+
continue;
1322+
}
1323+
None => {
1324+
// No more spilled files, go back to receiving from channel
1325+
self.state = RepartitionStreamState::ReceivingFromChannel;
1326+
continue;
1327+
}
1328+
}
1329+
}
1330+
}
1331+
}
12761332
}
12771333
}
12781334
}
@@ -1303,6 +1359,9 @@ struct PerPartitionStream {
13031359
/// SpillPool for batched spilling with FIFO semantics (shared across streams)
13041360
spill_pool: Arc<Mutex<SpillPool>>,
13051361

1362+
/// SpillManager for creating streams from spill files
1363+
spill_manager: Arc<SpillManager>,
1364+
13061365
/// Current state of the stream
13071366
state: RepartitionStreamState,
13081367
}
@@ -1328,13 +1387,20 @@ impl Stream for PerPartitionStream {
13281387
return Poll::Ready(Some(Ok(batch)));
13291388
}
13301389
Ok(RepartitionBatch::Spilled) => {
1331-
// Read from SpillPool (FIFO order)
1332-
match self.spill_pool.lock().pop_batch()? {
1333-
Some(batch) => {
1334-
return Poll::Ready(Some(Ok(batch)));
1390+
// Get next file from SpillPool and create a stream
1391+
let next_file = self.spill_pool.lock().take_next_file()?;
1392+
match next_file {
1393+
Some(spill_file) => {
1394+
// Create stream using SpillReaderStream + spawn_buffered
1395+
let stream = self
1396+
.spill_manager
1397+
.read_spill_as_stream(spill_file, None)?;
1398+
self.state =
1399+
RepartitionStreamState::ReadingSpilledBatch(stream);
1400+
continue;
13351401
}
13361402
None => {
1337-
// No spilled batches available, continue receiving
1403+
// No spilled files available, continue receiving from channel
13381404
continue;
13391405
}
13401406
}
@@ -1350,6 +1416,38 @@ impl Stream for PerPartitionStream {
13501416
None => return Poll::Ready(None),
13511417
}
13521418
}
1419+
RepartitionStreamState::ReadingSpilledBatch(stream) => {
1420+
match futures::ready!(stream.poll_next_unpin(cx)) {
1421+
Some(Ok(batch)) => {
1422+
// Return batch and stay in ReadingSpilledBatch state
1423+
return Poll::Ready(Some(Ok(batch)));
1424+
}
1425+
Some(Err(e)) => {
1426+
// Error reading spilled batch
1427+
return Poll::Ready(Some(Err(e)));
1428+
}
1429+
None => {
1430+
// Current spill file exhausted, check if there are more
1431+
let next_file = self.spill_pool.lock().take_next_file()?;
1432+
match next_file {
1433+
Some(spill_file) => {
1434+
// Create stream for next file
1435+
let new_stream = self
1436+
.spill_manager
1437+
.read_spill_as_stream(spill_file, None)?;
1438+
self.state =
1439+
RepartitionStreamState::ReadingSpilledBatch(new_stream);
1440+
continue;
1441+
}
1442+
None => {
1443+
// No more spilled files, go back to receiving from channel
1444+
self.state = RepartitionStreamState::ReceivingFromChannel;
1445+
continue;
1446+
}
1447+
}
1448+
}
1449+
}
1450+
}
13531451
}
13541452
}
13551453
}

datafusion/physical-plan/src/spill/spill_pool.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,27 @@ impl SpillPool {
315315
}
316316
}
317317

318+
/// Takes the next spill file from the pool for reading.
319+
///
320+
/// Returns the oldest unread file, or None if the pool is empty.
321+
/// The file is removed from the pool and should be read using
322+
/// `SpillManager::read_spill_as_stream()`.
323+
///
324+
/// This method flushes any pending writes before returning a file.
325+
///
326+
/// # Errors
327+
///
328+
/// Returns an error if flushing pending writes fails.
329+
pub fn take_next_file(&mut self) -> Result<Option<RefCountedTempFile>> {
330+
// Ensure any pending writes are flushed first
331+
if self.current_write_file.is_some() {
332+
self.flush()?;
333+
}
334+
335+
// Take the oldest file from the queue
336+
Ok(self.files.pop_front().map(|spill_file| spill_file.file))
337+
}
338+
318339
/// Finalizes the current write file and adds it to the files queue.
319340
///
320341
/// Called automatically by `push_batch` when rotating files, but can

0 commit comments

Comments
 (0)