@@ -21,7 +21,7 @@ use datafusion::datasource::source_as_provider;
21
21
use datafusion:: error:: DataFusionError ;
22
22
use datafusion:: physical_plan:: { ExecutionPlan , ExecutionPlanProperties } ;
23
23
use std:: any:: type_name;
24
- use std:: collections:: HashMap ;
24
+ use std:: collections:: { HashMap , HashSet } ;
25
25
use std:: sync:: Arc ;
26
26
use std:: time:: Instant ;
27
27
@@ -174,17 +174,31 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
174
174
tokio:: spawn ( async move {
175
175
let mut if_revive = false ;
176
176
match state. launch_tasks ( schedulable_tasks) . await {
177
- Ok ( unassigned_executor_slots) => {
178
- if !unassigned_executor_slots. is_empty ( ) {
179
- if let Err ( e) = state
180
- . executor_manager
181
- . unbind_tasks ( unassigned_executor_slots)
182
- . await
177
+ Ok ( launch_tasks_futs) => {
178
+ let unassigned_slots = launch_tasks_futs
179
+ . iter ( )
180
+ . flat_map ( LaunchMultiTaskFut :: unassigned_slot)
181
+ . collect :: < Vec < _ > > ( ) ;
182
+ if !unassigned_slots. is_empty ( ) {
183
+ if let Err ( e) =
184
+ state. executor_manager . unbind_tasks ( unassigned_slots) . await
183
185
{
184
186
error ! ( "Fail to unbind tasks: {}" , e) ;
185
187
}
186
188
if_revive = true ;
187
189
}
190
+ let failed_jobs = launch_tasks_futs
191
+ . into_iter ( )
192
+ . flat_map ( |fut| fut. prepare_failed_jobs . into_keys ( ) )
193
+ . collect :: < HashSet < String > > ( ) ;
194
+ for job_id in failed_jobs {
195
+ if let Err ( e) = sender
196
+ . post_event ( QueryStageSchedulerEvent :: JobCancel ( job_id) )
197
+ . await
198
+ {
199
+ error ! ( "Cancel job error due to {e}" ) ;
200
+ }
201
+ }
188
202
}
189
203
Err ( e) => {
190
204
error ! ( "Fail to launch tasks: {}" , e) ;
@@ -248,7 +262,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
248
262
async fn launch_tasks (
249
263
& self ,
250
264
bound_tasks : Vec < BoundTask > ,
251
- ) -> Result < Vec < ExecutorSlot > > {
265
+ ) -> Result < Vec < LaunchMultiTaskFut > > {
252
266
// Put tasks to the same executor together
253
267
// And put tasks belonging to the same stage together for creating MultiTaskDefinition
254
268
let mut executor_stage_assignments: HashMap <
@@ -275,62 +289,58 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
275
289
276
290
let mut join_handles = vec ! [ ] ;
277
291
for ( executor_id, tasks) in executor_stage_assignments. into_iter ( ) {
278
- let tasks: Vec < Vec < TaskDescription > > = tasks. into_values ( ) . collect ( ) ;
279
292
// Total number of tasks to be launched for one executor
280
- let n_tasks: usize = tasks. iter ( ) . map ( |stage_tasks| stage_tasks . len ( ) ) . sum ( ) ;
293
+ let n_tasks: usize = tasks. values ( ) . map ( Vec :: len) . sum ( ) ;
281
294
282
295
let state = self . clone ( ) ;
283
296
let join_handle = tokio:: spawn ( async move {
284
- let success = match state
297
+ match state
285
298
. executor_manager
286
299
. get_executor_metadata ( & executor_id)
287
300
. await
288
301
{
289
302
Ok ( executor) => {
290
- if let Err ( e ) = state
303
+ match state
291
304
. task_manager
292
305
. launch_multi_task ( & executor, tasks, & state. executor_manager )
293
306
. await
294
307
{
295
- let err_msg = format ! ( "Failed to launch new task: {e}" ) ;
296
- error ! ( "{}" , err_msg. clone( ) ) ;
297
-
298
- // It's OK to remove executor aggressively,
299
- // since if the executor is in healthy state, it will be registered again.
300
- state. remove_executor ( & executor_id, Some ( err_msg) ) . await ;
301
-
302
- false
303
- } else {
304
- true
308
+ Ok ( prepare_failed_jobs) => LaunchMultiTaskFut :: new (
309
+ executor_id,
310
+ 0 ,
311
+ prepare_failed_jobs,
312
+ ) ,
313
+ Err ( e) => {
314
+ let err_msg = format ! ( "Failed to launch new task: {e}" ) ;
315
+ error ! ( "{}" , err_msg. clone( ) ) ;
316
+
317
+ // It's OK to remove executor aggressively,
318
+ // since if the executor is in healthy state, it will be registered again.
319
+ state. remove_executor ( & executor_id, Some ( err_msg) ) . await ;
320
+ LaunchMultiTaskFut :: new (
321
+ executor_id,
322
+ n_tasks,
323
+ HashMap :: new ( ) ,
324
+ )
325
+ }
305
326
}
306
327
}
307
328
Err ( e) => {
308
329
error ! ( "Failed to launch new task, could not get executor metadata: {}" , e) ;
309
- false
330
+ LaunchMultiTaskFut :: new ( executor_id , n_tasks , HashMap :: new ( ) )
310
331
}
311
- } ;
312
- if success {
313
- vec ! [ ]
314
- } else {
315
- vec ! [ ( executor_id. clone( ) , n_tasks as u32 ) ]
316
332
}
317
333
} ) ;
318
334
join_handles. push ( join_handle) ;
319
335
}
320
336
321
- let unassigned_executor_slots =
322
- futures:: future:: join_all ( join_handles)
323
- . await
324
- . into_iter ( )
325
- . collect :: < std:: result:: Result <
326
- Vec < Vec < ExecutorSlot > > ,
327
- tokio:: task:: JoinError ,
328
- > > ( ) ?;
329
-
330
- Ok ( unassigned_executor_slots
337
+ let launch_futs = futures:: future:: join_all ( join_handles)
338
+ . await
331
339
. into_iter ( )
332
- . flatten ( )
333
- . collect :: < Vec < ExecutorSlot > > ( ) )
340
+ . collect :: < std:: result:: Result < Vec < _ > , tokio:: task:: JoinError > > (
341
+ ) ?;
342
+
343
+ Ok ( launch_futs)
334
344
}
335
345
336
346
pub ( crate ) async fn update_task_statuses (
@@ -463,3 +473,33 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
463
473
) ;
464
474
}
465
475
}
476
+
477
+ pub ( crate ) struct LaunchMultiTaskFut {
478
+ pub executor_id : String ,
479
+ pub unassigned_num : usize ,
480
+ pub prepare_failed_jobs : HashMap < String , Vec < TaskDescription > > ,
481
+ }
482
+
483
+ impl LaunchMultiTaskFut {
484
+ pub fn new (
485
+ executor_id : String ,
486
+ unassigned_num : usize ,
487
+ prepare_failed_jobs : HashMap < String , Vec < TaskDescription > > ,
488
+ ) -> Self {
489
+ Self {
490
+ executor_id,
491
+ unassigned_num,
492
+ prepare_failed_jobs,
493
+ }
494
+ }
495
+
496
+ pub fn unassigned_slot ( & self ) -> Option < ExecutorSlot > {
497
+ let fail_num: usize = self . prepare_failed_jobs . values ( ) . map ( Vec :: len) . sum ( ) ;
498
+ let slots = ( self . unassigned_num + fail_num) as u32 ;
499
+ if slots > 0 {
500
+ Some ( ( self . executor_id . clone ( ) , slots) )
501
+ } else {
502
+ None
503
+ }
504
+ }
505
+ }
0 commit comments