Skip to content

Commit f7c84fe

Browse files
committed
fix lints
1 parent ed8c7d0 commit f7c84fe

File tree

2 files changed

+14
-13
lines changed

2 files changed

+14
-13
lines changed

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1383,8 +1383,8 @@ impl Stream for PerPartitionStream {
13831383
return Poll::Ready(Some(Err(e)));
13841384
}
13851385
Poll::Ready(None) => {
1386-
// Spill stream never ends, this shouldn't happen
1387-
unreachable!("SpillPoolStream should never end");
1386+
// Spill stream ended - all spilled data has been read
1387+
return Poll::Ready(None);
13881388
}
13891389
Poll::Pending => {
13901390
// No spilled data available

datafusion/physical-plan/src/spill/spill_pool.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,7 @@ mod tests {
404404
use crate::metrics::{ExecutionPlanMetricsSet, SpillMetrics};
405405
use arrow::array::{ArrayRef, Int32Array};
406406
use arrow::datatypes::{DataType, Field, Schema};
407+
use datafusion_common_runtime::SpawnedTask;
407408
use datafusion_execution::runtime_env::RuntimeEnv;
408409
use futures::StreamExt;
409410
use std::task::Poll;
@@ -639,7 +640,7 @@ mod tests {
639640

640641
// Spawn a task that will push data after a delay
641642
let writer_pool = Arc::clone(&pool_arc);
642-
tokio::spawn(async move {
643+
SpawnedTask::spawn(async move {
643644
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
644645
let mut pool = writer_pool.lock();
645646
pool.push_batch(&create_test_batch(0, 10)).unwrap();
@@ -672,7 +673,7 @@ mod tests {
672673

673674
// Spawn task to flush after delay
674675
let writer_pool = Arc::clone(&pool_arc);
675-
tokio::spawn(async move {
676+
SpawnedTask::spawn(async move {
676677
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
677678
let mut pool = writer_pool.lock();
678679
pool.flush().unwrap();
@@ -700,7 +701,7 @@ mod tests {
700701

701702
// Finalize after delay
702703
let writer_pool = Arc::clone(&pool_arc);
703-
tokio::spawn(async move {
704+
SpawnedTask::spawn(async move {
704705
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
705706
writer_pool.lock().finalize();
706707
});
@@ -742,7 +743,7 @@ mod tests {
742743
let pool_arc = Arc::new(Mutex::new(pool));
743744

744745
let writer_pool = Arc::clone(&pool_arc);
745-
let writer = tokio::spawn(async move {
746+
let writer = SpawnedTask::spawn(async move {
746747
for i in 0..10 {
747748
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
748749
let mut pool = writer_pool.lock();
@@ -754,7 +755,7 @@ mod tests {
754755

755756
let reader_pool = Arc::clone(&pool_arc);
756757
let stream = SpillPool::reader(reader_pool, spill_manager);
757-
let reader = tokio::spawn(async move { collect_batches(stream).await });
758+
let reader = SpawnedTask::spawn(async move { collect_batches(stream).await });
758759

759760
// Wait for both tasks
760761
writer.await.unwrap();
@@ -822,8 +823,8 @@ mod tests {
822823
let stream2 = SpillPool::reader(Arc::clone(&pool_arc), spill_manager);
823824

824825
// Read from both concurrently
825-
let reader1 = tokio::spawn(async move { collect_batches(stream1).await });
826-
let reader2 = tokio::spawn(async move { collect_batches(stream2).await });
826+
let reader1 = SpawnedTask::spawn(async move { collect_batches(stream1).await });
827+
let reader2 = SpawnedTask::spawn(async move { collect_batches(stream2).await });
827828

828829
let batches1 = reader1.await.unwrap()?;
829830
let batches2 = reader2.await.unwrap()?;
@@ -843,7 +844,7 @@ mod tests {
843844
let pool_arc = Arc::new(Mutex::new(pool));
844845

845846
let writer_pool = Arc::clone(&pool_arc);
846-
let writer = tokio::spawn(async move {
847+
let writer = SpawnedTask::spawn(async move {
847848
// Write multiple batches that will cause rotation
848849
for i in 0..8 {
849850
{
@@ -859,7 +860,7 @@ mod tests {
859860
// Read concurrently
860861
let reader_pool = Arc::clone(&pool_arc);
861862
let stream = SpillPool::reader(reader_pool, spill_manager);
862-
let reader = tokio::spawn(async move { collect_batches(stream).await });
863+
let reader = SpawnedTask::spawn(async move { collect_batches(stream).await });
863864

864865
writer.await.unwrap();
865866
let batches = reader.await.unwrap()?;
@@ -936,7 +937,7 @@ mod tests {
936937

937938
// Write and read concurrently
938939
let writer_pool = Arc::clone(&pool_arc);
939-
let writer = tokio::spawn(async move {
940+
let writer = SpawnedTask::spawn(async move {
940941
for i in 0..10 {
941942
{
942943
let mut pool = writer_pool.lock();
@@ -950,7 +951,7 @@ mod tests {
950951

951952
let reader_pool = Arc::clone(&pool_arc);
952953
let stream = SpillPool::reader(reader_pool, spill_manager);
953-
let reader = tokio::spawn(async move {
954+
let reader = SpawnedTask::spawn(async move {
954955
let mut batches = Vec::new();
955956
let mut stream = stream;
956957
while let Some(result) = stream.next().await {

0 commit comments

Comments
 (0)