@@ -404,6 +404,7 @@ mod tests {
404404 use crate :: metrics:: { ExecutionPlanMetricsSet , SpillMetrics } ;
405405 use arrow:: array:: { ArrayRef , Int32Array } ;
406406 use arrow:: datatypes:: { DataType , Field , Schema } ;
407+ use datafusion_common_runtime:: SpawnedTask ;
407408 use datafusion_execution:: runtime_env:: RuntimeEnv ;
408409 use futures:: StreamExt ;
409410 use std:: task:: Poll ;
@@ -639,7 +640,7 @@ mod tests {
639640
640641 // Spawn a task that will push data after a delay
641642 let writer_pool = Arc :: clone ( & pool_arc) ;
642- tokio :: spawn ( async move {
643+ SpawnedTask :: spawn ( async move {
643644 tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 50 ) ) . await ;
644645 let mut pool = writer_pool. lock ( ) ;
645646 pool. push_batch ( & create_test_batch ( 0 , 10 ) ) . unwrap ( ) ;
@@ -672,7 +673,7 @@ mod tests {
672673
673674 // Spawn task to flush after delay
674675 let writer_pool = Arc :: clone ( & pool_arc) ;
675- tokio :: spawn ( async move {
676+ SpawnedTask :: spawn ( async move {
676677 tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 50 ) ) . await ;
677678 let mut pool = writer_pool. lock ( ) ;
678679 pool. flush ( ) . unwrap ( ) ;
@@ -700,7 +701,7 @@ mod tests {
700701
701702 // Finalize after delay
702703 let writer_pool = Arc :: clone ( & pool_arc) ;
703- tokio :: spawn ( async move {
704+ SpawnedTask :: spawn ( async move {
704705 tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 50 ) ) . await ;
705706 writer_pool. lock ( ) . finalize ( ) ;
706707 } ) ;
@@ -742,7 +743,7 @@ mod tests {
742743 let pool_arc = Arc :: new ( Mutex :: new ( pool) ) ;
743744
744745 let writer_pool = Arc :: clone ( & pool_arc) ;
745- let writer = tokio :: spawn ( async move {
746+ let writer = SpawnedTask :: spawn ( async move {
746747 for i in 0 ..10 {
747748 tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 10 ) ) . await ;
748749 let mut pool = writer_pool. lock ( ) ;
@@ -754,7 +755,7 @@ mod tests {
754755
755756 let reader_pool = Arc :: clone ( & pool_arc) ;
756757 let stream = SpillPool :: reader ( reader_pool, spill_manager) ;
757- let reader = tokio :: spawn ( async move { collect_batches ( stream) . await } ) ;
758+ let reader = SpawnedTask :: spawn ( async move { collect_batches ( stream) . await } ) ;
758759
759760 // Wait for both tasks
760761 writer. await . unwrap ( ) ;
@@ -822,8 +823,8 @@ mod tests {
822823 let stream2 = SpillPool :: reader ( Arc :: clone ( & pool_arc) , spill_manager) ;
823824
824825 // Read from both concurrently
825- let reader1 = tokio :: spawn ( async move { collect_batches ( stream1) . await } ) ;
826- let reader2 = tokio :: spawn ( async move { collect_batches ( stream2) . await } ) ;
826+ let reader1 = SpawnedTask :: spawn ( async move { collect_batches ( stream1) . await } ) ;
827+ let reader2 = SpawnedTask :: spawn ( async move { collect_batches ( stream2) . await } ) ;
827828
828829 let batches1 = reader1. await . unwrap ( ) ?;
829830 let batches2 = reader2. await . unwrap ( ) ?;
@@ -843,7 +844,7 @@ mod tests {
843844 let pool_arc = Arc :: new ( Mutex :: new ( pool) ) ;
844845
845846 let writer_pool = Arc :: clone ( & pool_arc) ;
846- let writer = tokio :: spawn ( async move {
847+ let writer = SpawnedTask :: spawn ( async move {
847848 // Write multiple batches that will cause rotation
848849 for i in 0 ..8 {
849850 {
@@ -859,7 +860,7 @@ mod tests {
859860 // Read concurrently
860861 let reader_pool = Arc :: clone ( & pool_arc) ;
861862 let stream = SpillPool :: reader ( reader_pool, spill_manager) ;
862- let reader = tokio :: spawn ( async move { collect_batches ( stream) . await } ) ;
863+ let reader = SpawnedTask :: spawn ( async move { collect_batches ( stream) . await } ) ;
863864
864865 writer. await . unwrap ( ) ;
865866 let batches = reader. await . unwrap ( ) ?;
@@ -936,7 +937,7 @@ mod tests {
936937
937938 // Write and read concurrently
938939 let writer_pool = Arc :: clone ( & pool_arc) ;
939- let writer = tokio :: spawn ( async move {
940+ let writer = SpawnedTask :: spawn ( async move {
940941 for i in 0 ..10 {
941942 {
942943 let mut pool = writer_pool. lock ( ) ;
@@ -950,7 +951,7 @@ mod tests {
950951
951952 let reader_pool = Arc :: clone ( & pool_arc) ;
952953 let stream = SpillPool :: reader ( reader_pool, spill_manager) ;
953- let reader = tokio :: spawn ( async move {
954+ let reader = SpawnedTask :: spawn ( async move {
954955 let mut batches = Vec :: new ( ) ;
955956 let mut stream = stream;
956957 while let Some ( result) = stream. next ( ) . await {
0 commit comments