@@ -537,14 +537,16 @@ def check_ray_port_and_cluster_healthy() -> Tuple[int, bool, bool]:
537
537
instance_setup .RAY_STATUS_WITH_SKY_RAY_PORT_COMMAND ,
538
538
stream_logs = False ,
539
539
require_outputs = True )
540
- if returncode :
541
- logger .debug ('Ray cluster on head is not ready.' )
542
- else :
543
- logger .debug ('Ray cluster on head is ready.' )
540
+ if not returncode :
544
541
ray_port = common_utils .decode_payload (stdout )['ray_port' ]
542
+ logger .debug (f'Ray cluster on head is up with port { ray_port } .' )
543
+
545
544
head_ray_needs_restart = bool (returncode )
546
- ray_cluster_healthy = is_ray_cluster_healthy (
547
- stdout , cluster_info .num_instances )
545
+ # This is a best effort check to see if the ray cluster has expected
546
+ # number of nodes connected.
547
+ ray_cluster_healthy = (not head_ray_needs_restart and
548
+ is_ray_cluster_healthy (
549
+ stdout , cluster_info .num_instances ))
548
550
return ray_port , ray_cluster_healthy , head_ray_needs_restart
549
551
550
552
status .update (
@@ -585,6 +587,9 @@ def check_ray_port_and_cluster_healthy() -> Tuple[int, bool, bool]:
585
587
custom_resource = custom_resource ,
586
588
cluster_info = cluster_info ,
587
589
ssh_credentials = ssh_credentials )
590
+ else :
591
+ logger .debug ('Ray cluster on head is ready. Skip starting ray '
592
+ 'cluster on head node.' )
588
593
589
594
# NOTE: We have to check all worker nodes to make sure they are all
590
595
# healthy, otherwise we can only start Ray on newly started worker
@@ -610,6 +615,9 @@ def check_ray_port_and_cluster_healthy() -> Tuple[int, bool, bool]:
610
615
ray_port = ray_port ,
611
616
cluster_info = cluster_info ,
612
617
ssh_credentials = ssh_credentials )
618
+ elif ray_cluster_healthy :
619
+ logger .debug ('Ray cluster is ready. Skip starting ray cluster on '
620
+ 'worker nodes.' )
613
621
614
622
instance_setup .start_skylet_on_head_node (cluster_name .name_on_cloud ,
615
623
cluster_info , ssh_credentials )
0 commit comments