From 74a8075e7deba2712cc9406be734d78c9548b7c7 Mon Sep 17 00:00:00 2001 From: zpoint Date: Tue, 26 Nov 2024 17:40:57 +0800 Subject: [PATCH] Event based smoke tests -- manged jobs (#4386) * event based smoke test * more event based smoke test * more test cases * more test cases with managed jobs * bug fix * bump up seconds * merge master and resolve conflict * more test case * support test_managed_jobs_pipeline_failed_setup * support test_managed_jobs_recovery_aws * manged job status * bug fix * test managed job cancel * test_managed_jobs_storage * more test cases * resolve pr comment * private member function * bug fix * interface change * bug fix * bug fix * raise error on empty status --- tests/test_smoke.py | 415 ++++++++++++++++++++++++++++++-------------- 1 file changed, 288 insertions(+), 127 deletions(-) diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 6ba81ce68f0..574dae21ea0 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -59,6 +59,7 @@ from sky.data import data_utils from sky.data import storage as storage_lib from sky.data.data_utils import Rclone +from sky.jobs.state import ManagedJobStatus from sky.skylet import constants from sky.skylet import events from sky.skylet.job_lib import JobStatus @@ -101,8 +102,20 @@ # Cluster functions _ALL_JOB_STATUSES = "|".join([status.value for status in JobStatus]) _ALL_CLUSTER_STATUSES = "|".join([status.value for status in ClusterStatus]) +_ALL_MANAGED_JOB_STATUSES = "|".join( + [status.value for status in ManagedJobStatus]) -_WAIT_UNTIL_CLUSTER_STATUS_IS = ( + +def _statuses_to_str(statuses: List[enum.Enum]): + """Convert a list of enums to a string with all the values separated by |.""" + assert len(statuses) > 0, 'statuses must not be empty' + if len(statuses) > 1: + return '(' + '|'.join([status.value for status in statuses]) + ')' + else: + return statuses[0].value + + +_WAIT_UNTIL_CLUSTER_STATUS_CONTAINS = ( # A while loop to wait until the cluster status # becomes certain status, with timeout. 'start_time=$SECONDS; ' @@ -120,6 +133,29 @@ 'sleep 10; ' 'done') + +def _get_cmd_wait_until_cluster_status_contains( + cluster_name: str, cluster_status: List[ClusterStatus], timeout: int): + return _WAIT_UNTIL_CLUSTER_STATUS_CONTAINS.format( + cluster_name=cluster_name, + cluster_status=_statuses_to_str(cluster_status), + timeout=timeout) + + +def _get_cmd_wait_until_cluster_status_contains_wildcard( + cluster_name_wildcard: str, cluster_status: List[ClusterStatus], + timeout: int): + wait_cmd = _WAIT_UNTIL_CLUSTER_STATUS_CONTAINS.replace( + 'sky status {cluster_name}', + 'sky status "{cluster_name}"').replace('awk "/^{cluster_name}/', + 'awk "/^{cluster_name_awk}/') + return wait_cmd.format(cluster_name=cluster_name_wildcard, + cluster_name_awk=cluster_name_wildcard.replace( + '*', '.*'), + cluster_status=_statuses_to_str(cluster_status), + timeout=timeout) + + _WAIT_UNTIL_CLUSTER_IS_NOT_FOUND = ( # A while loop to wait until the cluster is not found or timeout 'start_time=$SECONDS; ' @@ -130,10 +166,16 @@ 'if sky status -r {cluster_name}; sky status {cluster_name} | grep "{cluster_name} not found"; then ' ' echo "Cluster {cluster_name} successfully removed."; break; ' 'fi; ' - 'echo "Waiting for cluster {name} to be removed..."; ' + 'echo "Waiting for cluster {cluster_name} to be removed..."; ' 'sleep 10; ' 'done') + +def _get_cmd_wait_until_cluster_is_not_found(cluster_name: str, timeout: int): + return _WAIT_UNTIL_CLUSTER_IS_NOT_FOUND.format(cluster_name=cluster_name, + timeout=timeout) + + _WAIT_UNTIL_JOB_STATUS_CONTAINS_MATCHING_JOB_ID = ( # A while loop to wait until the job status # contains certain status, with timeout. @@ -165,11 +207,51 @@ _WAIT_UNTIL_JOB_STATUS_CONTAINS_MATCHING_JOB_NAME = _WAIT_UNTIL_JOB_STATUS_CONTAINS_MATCHING_JOB_ID.replace( 'awk "\\$1 == \\"{job_id}\\"', 'awk "\\$2 == \\"{job_name}\\"') + +def _get_cmd_wait_until_job_status_contains_matching_job_id( + cluster_name: str, job_id: str, job_status: List[JobStatus], + timeout: int): + return _WAIT_UNTIL_JOB_STATUS_CONTAINS_MATCHING_JOB_ID.format( + cluster_name=cluster_name, + job_id=job_id, + job_status=_statuses_to_str(job_status), + timeout=timeout) + + +def _get_cmd_wait_until_job_status_contains_without_matching_job( + cluster_name: str, job_status: List[JobStatus], timeout: int): + return _WAIT_UNTIL_JOB_STATUS_CONTAINS_WITHOUT_MATCHING_JOB.format( + cluster_name=cluster_name, + job_status=_statuses_to_str(job_status), + timeout=timeout) + + +def _get_cmd_wait_until_job_status_contains_matching_job_name( + cluster_name: str, job_name: str, job_status: List[JobStatus], + timeout: int): + return _WAIT_UNTIL_JOB_STATUS_CONTAINS_MATCHING_JOB_NAME.format( + cluster_name=cluster_name, + job_name=job_name, + job_status=_statuses_to_str(job_status), + timeout=timeout) + + # Managed job functions _WAIT_UNTIL_MANAGED_JOB_STATUS_CONTAINS_MATCHING_JOB_NAME = _WAIT_UNTIL_JOB_STATUS_CONTAINS_MATCHING_JOB_NAME.replace( - 'sky queue {cluster_name}', - 'sky jobs queue').replace('awk "\\$2 == ', 'awk "\\$3 == ') + 'sky queue {cluster_name}', 'sky jobs queue').replace( + 'awk "\\$2 == \\"{job_name}\\"', + 'awk "\\$2 == \\"{job_name}\\" || \\$3 == \\"{job_name}\\"').replace( + _ALL_JOB_STATUSES, _ALL_MANAGED_JOB_STATUSES) + + +def _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name: str, job_status: List[JobStatus], timeout: int): + return _WAIT_UNTIL_MANAGED_JOB_STATUS_CONTAINS_MATCHING_JOB_NAME.format( + job_name=job_name, + job_status=_statuses_to_str(job_status), + timeout=timeout) + # After the timeout, the cluster will stop if autostop is set, and our check # should be more than the timeout. To address this, we extend the timeout by @@ -489,11 +571,13 @@ def test_launch_fast_with_autostop(generic_cloud: str): f'sky status -r {name} | grep UP', # Ensure cluster is stopped - _WAIT_UNTIL_CLUSTER_STATUS_IS.format( + _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status=ClusterStatus.STOPPED.value, + cluster_status=[ClusterStatus.STOPPED], timeout=autostop_timeout), - + # Even the cluster is stopped, cloud platform may take a while to + # delete the VM. + f'sleep {_BUMP_UP_SECONDS}', # Launch again. Do full output validation - we expect the cluster to re-launch f'unset SKYPILOT_DEBUG; s=$(sky launch -y -c {name} --fast -i 1 tests/test_yamls/minimal.yaml) && {_VALIDATE_LAUNCH_OUTPUT}', f'sky logs {name} 2 --status', @@ -530,6 +614,7 @@ def test_aws_region(): @pytest.mark.aws def test_aws_with_ssh_proxy_command(): name = _get_cluster_name() + with tempfile.NamedTemporaryFile(mode='w') as f: f.write( textwrap.dedent(f"""\ @@ -551,10 +636,18 @@ def test_aws_with_ssh_proxy_command(): f'sky jobs launch -n {name}-0 --cloud aws --cpus 2 --use-spot -y echo hi', # Wait other tests to create the job controller first, so that # the job controller is not launched with proxy command. - 'timeout 300s bash -c "until sky status sky-jobs-controller* | grep UP; do sleep 1; done"', + _get_cmd_wait_until_cluster_status_contains_wildcard( + cluster_name_wildcard='sky-jobs-controller-*', + cluster_status=[ClusterStatus.UP], + timeout=300), f'export SKYPILOT_CONFIG={f.name}; sky jobs launch -n {name} --cpus 2 --cloud aws --region us-east-1 -yd echo hi', - 'sleep 300', - f'{_GET_JOB_QUEUE} | grep {name} | grep "STARTING\|RUNNING\|SUCCEEDED"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ + ManagedJobStatus.SUCCEEDED, ManagedJobStatus.RUNNING, + ManagedJobStatus.STARTING + ], + timeout=300), ], f'sky down -y {name} jump-{name}; sky jobs cancel -y -n {name}', ) @@ -924,9 +1017,9 @@ def test_clone_disk_aws(): f'sky launch -y -c {name} --cloud aws --region us-east-2 --retry-until-up "echo hello > ~/user_file.txt"', f'sky launch --clone-disk-from {name} -y -c {name}-clone && exit 1 || true', f'sky stop {name} -y', - _WAIT_UNTIL_CLUSTER_STATUS_IS.format( + _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status=ClusterStatus.STOPPED.value, + cluster_status=[ClusterStatus.STOPPED], timeout=60), # Wait for EC2 instance to be in stopped state. # TODO: event based wait. @@ -976,8 +1069,8 @@ def test_gcp_mig(): # Check MIG exists. f'gcloud compute instance-groups managed list --format="value(name)" | grep "^sky-mig-{name}"', f'sky autostop -i 0 --down -y {name}', - _WAIT_UNTIL_CLUSTER_IS_NOT_FOUND.format(cluster_name=name, - timeout=120), + _get_cmd_wait_until_cluster_is_not_found(cluster_name=name, + timeout=120), f'gcloud compute instance-templates list | grep "sky-it-{name}"', # Launch again with the same region. The original instance template # should be removed. @@ -1044,9 +1137,9 @@ def test_custom_default_conda_env(generic_cloud: str): f'sky exec {name} tests/test_yamls/test_custom_default_conda_env.yaml', f'sky logs {name} 2 --status', f'sky autostop -y -i 0 {name}', - _WAIT_UNTIL_CLUSTER_STATUS_IS.format( + _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status=ClusterStatus.STOPPED.value, + cluster_status=[ClusterStatus.STOPPED], timeout=80), f'sky start -y {name}', f'sky logs {name} 2 --no-follow | grep -E "myenv\\s+\\*"', @@ -1068,9 +1161,9 @@ def test_stale_job(generic_cloud: str): f'sky launch -y -c {name} --cloud {generic_cloud} "echo hi"', f'sky exec {name} -d "echo start; sleep 10000"', f'sky stop {name} -y', - _WAIT_UNTIL_CLUSTER_STATUS_IS.format( + _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status=ClusterStatus.STOPPED.value, + cluster_status=[ClusterStatus.STOPPED], timeout=100), f'sky start {name} -y', f'sky logs {name} 1 --status', @@ -1099,17 +1192,17 @@ def test_aws_stale_job_manual_restart(): '--output text`; ' f'aws ec2 stop-instances --region {region} ' '--instance-ids $id', - _WAIT_UNTIL_CLUSTER_STATUS_IS.format( + _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status=ClusterStatus.STOPPED.value, + cluster_status=[ClusterStatus.STOPPED], timeout=40), f'sky launch -c {name} -y "echo hi"', f'sky logs {name} 1 --status', f'sky logs {name} 3 --status', # Ensure the skylet updated the stale job status. - _WAIT_UNTIL_JOB_STATUS_CONTAINS_WITHOUT_MATCHING_JOB.format( + _get_cmd_wait_until_job_status_contains_without_matching_job( cluster_name=name, - job_status=JobStatus.FAILED_DRIVER.value, + job_status=[JobStatus.FAILED_DRIVER], timeout=events.JobSchedulerEvent.EVENT_INTERVAL_SECONDS), ], f'sky down -y {name}', @@ -1140,9 +1233,9 @@ def test_gcp_stale_job_manual_restart(): f'sky logs {name} 1 --status', f'sky logs {name} 3 --status', # Ensure the skylet updated the stale job status. - _WAIT_UNTIL_JOB_STATUS_CONTAINS_WITHOUT_MATCHING_JOB.format( + _get_cmd_wait_until_job_status_contains_without_matching_job( cluster_name=name, - job_status=JobStatus.FAILED_DRIVER.value, + job_status=[JobStatus.FAILED_DRIVER], timeout=events.JobSchedulerEvent.EVENT_INTERVAL_SECONDS) ], f'sky down -y {name}', @@ -1820,6 +1913,7 @@ def test_large_job_queue(generic_cloud: str): f'for i in `seq 1 75`; do sky exec {name} -n {name}-$i -d "echo $i; sleep 100000000"; done', f'sky cancel -y {name} 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16', 'sleep 90', + # Each job takes 0.5 CPU and the default VM has 8 CPUs, so there should be 8 / 0.5 = 16 jobs running. # The first 16 jobs are canceled, so there should be 75 - 32 = 43 jobs PENDING. f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep -v grep | grep PENDING | wc -l | grep 43', @@ -1962,10 +2056,10 @@ def test_multi_echo(generic_cloud: str): ] + # Ensure jobs succeeded. [ - _WAIT_UNTIL_JOB_STATUS_CONTAINS_MATCHING_JOB_ID.format( + _get_cmd_wait_until_job_status_contains_matching_job_id( cluster_name=name, job_id=i + 1, - job_status=JobStatus.SUCCEEDED.value, + job_status=[JobStatus.SUCCEEDED], timeout=120) for i in range(32) ] + # Ensure monitor/autoscaler didn't crash on the 'assert not @@ -2539,17 +2633,16 @@ def test_gcp_start_stop(): f'sky exec {name} "prlimit -n --pid=\$(pgrep -f \'raylet/raylet --raylet_socket_name\') | grep \'"\'1048576 1048576\'"\'"', # Ensure the raylet process has the correct file descriptor limit. f'sky logs {name} 3 --status', # Ensure the job succeeded. f'sky stop -y {name}', - _WAIT_UNTIL_CLUSTER_STATUS_IS.format( + _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status=ClusterStatus.STOPPED.value, + cluster_status=[ClusterStatus.STOPPED], timeout=40), f'sky start -y {name} -i 1', f'sky exec {name} examples/gcp_start_stop.yaml', f'sky logs {name} 4 --status', # Ensure the job succeeded. - _WAIT_UNTIL_CLUSTER_STATUS_IS.format( + _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status= - f'({ClusterStatus.STOPPED.value}|{ClusterStatus.INIT.value})', + cluster_status=[ClusterStatus.STOPPED, ClusterStatus.INIT], timeout=200), ], f'sky down -y {name}', @@ -2573,10 +2666,9 @@ def test_azure_start_stop(): f'sky start -y {name} -i 1', f'sky exec {name} examples/azure_start_stop.yaml', f'sky logs {name} 3 --status', # Ensure the job succeeded. - _WAIT_UNTIL_CLUSTER_STATUS_IS.format( + _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status= - f'({ClusterStatus.STOPPED.value}|{ClusterStatus.INIT.value})', + cluster_status=[ClusterStatus.STOPPED, ClusterStatus.INIT], timeout=280) + f'|| {{ ssh {name} "cat ~/.sky/skylet.log"; exit 1; }}', ], @@ -2614,9 +2706,9 @@ def test_autostop(generic_cloud: str): f's=$(sky status {name} --refresh); echo "$s"; echo; echo; echo "$s" | grep {name} | grep UP', # Ensure the cluster is STOPPED. - _WAIT_UNTIL_CLUSTER_STATUS_IS.format( + _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status=ClusterStatus.STOPPED.value, + cluster_status=[ClusterStatus.STOPPED], timeout=autostop_timeout), # Ensure the cluster is UP and the autostop setting is reset ('-'). @@ -2633,9 +2725,9 @@ def test_autostop(generic_cloud: str): f'sky autostop -y {name} -i 1', # Should restart the timer. 'sleep 40', f's=$(sky status {name} --refresh); echo "$s"; echo; echo; echo "$s" | grep {name} | grep UP', - _WAIT_UNTIL_CLUSTER_STATUS_IS.format( + _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status=ClusterStatus.STOPPED.value, + cluster_status=[ClusterStatus.STOPPED], timeout=autostop_timeout), # Test restarting the idleness timer via exec: @@ -2645,10 +2737,9 @@ def test_autostop(generic_cloud: str): 'sleep 45', # Almost reached the threshold. f'sky exec {name} echo hi', # Should restart the timer. 'sleep 45', - f's=$(sky status {name} --refresh); echo "$s"; echo; echo; echo "$s" | grep {name} | grep UP', - _WAIT_UNTIL_CLUSTER_STATUS_IS.format( + _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status=ClusterStatus.STOPPED.value, + cluster_status=[ClusterStatus.STOPPED], timeout=autostop_timeout + _BUMP_UP_SECONDS), ], f'sky down -y {name}', @@ -2866,18 +2957,18 @@ def test_stop_gcp_spot(): f'sky exec {name} -- ls myfile', f'sky logs {name} 2 --status', f'sky autostop {name} -i0 -y', - _WAIT_UNTIL_CLUSTER_STATUS_IS.format( + _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status=ClusterStatus.STOPPED.value, + cluster_status=[ClusterStatus.STOPPED], timeout=90), f'sky start {name} -y', f'sky exec {name} -- ls myfile', f'sky logs {name} 3 --status', # -i option at launch should go through: f'sky launch -c {name} -i0 -y', - _WAIT_UNTIL_CLUSTER_STATUS_IS.format( + _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status=ClusterStatus.STOPPED.value, + cluster_status=[ClusterStatus.STOPPED], timeout=120), ], f'sky down -y {name}', @@ -2898,20 +2989,24 @@ def test_managed_jobs(generic_cloud: str): [ f'sky jobs launch -n {name}-1 --cloud {generic_cloud} examples/managed_job.yaml -y -d', f'sky jobs launch -n {name}-2 --cloud {generic_cloud} examples/managed_job.yaml -y -d', - _WAIT_UNTIL_MANAGED_JOB_STATUS_CONTAINS_MATCHING_JOB_NAME.format( + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=f'{name}-1', - job_status= - f'({JobStatus.PENDING.value}|{JobStatus.INIT.value}|{JobStatus.RUNNING.value})', + job_status=[ + ManagedJobStatus.PENDING, ManagedJobStatus.SUBMITTED, + ManagedJobStatus.STARTING, ManagedJobStatus.RUNNING + ], timeout=60), - _WAIT_UNTIL_MANAGED_JOB_STATUS_CONTAINS_MATCHING_JOB_NAME.format( + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=f'{name}-2', - job_status= - f'({JobStatus.PENDING.value}|{JobStatus.INIT.value}|{JobStatus.RUNNING.value})', + job_status=[ + ManagedJobStatus.PENDING, ManagedJobStatus.SUBMITTED, + ManagedJobStatus.STARTING, ManagedJobStatus.RUNNING + ], timeout=60), f'sky jobs cancel -y -n {name}-1', - _WAIT_UNTIL_MANAGED_JOB_STATUS_CONTAINS_MATCHING_JOB_NAME.format( + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=f'{name}-1', - job_status=f'{JobStatus.CANCELLED.value}', + job_status=[ManagedJobStatus.CANCELLED], timeout=230), # Test the functionality for logging. f's=$(sky jobs logs -n {name}-2 --no-follow); echo "$s"; echo "$s" | grep "start counting"', @@ -2983,9 +3078,9 @@ def test_managed_jobs_failed_setup(generic_cloud: str): [ f'sky jobs launch -n {name} --cloud {generic_cloud} -y -d tests/test_yamls/failed_setup.yaml', # Make sure the job failed quickly. - _WAIT_UNTIL_MANAGED_JOB_STATUS_CONTAINS_MATCHING_JOB_NAME.format( + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, - job_status=f'{JobStatus.FAILED_SETUP.value}', + job_status=[ManagedJobStatus.FAILED_SETUP], timeout=330 + _BUMP_UP_SECONDS), ], f'sky jobs cancel -y -n {name}', @@ -3009,7 +3104,10 @@ def test_managed_jobs_pipeline_failed_setup(generic_cloud: str): 'managed_jobs_pipeline_failed_setup', [ f'sky jobs launch -n {name} -y -d tests/test_yamls/failed_setup_pipeline.yaml', - 'sleep 600', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.FAILED_SETUP], + timeout=600), # Make sure the job failed quickly. f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "FAILED_SETUP"', # Task 0 should be SUCCEEDED. @@ -3043,8 +3141,10 @@ def test_managed_jobs_recovery_aws(aws_config_region): 'managed_jobs_recovery_aws', [ f'sky jobs launch --cloud aws --region {region} --use-spot -n {name} "echo SKYPILOT_TASK_ID: \$SKYPILOT_TASK_ID; sleep 1800" -y -d', - 'sleep 360', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.RUNNING], + timeout=600), f'RUN_ID=$(sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID | cut -d: -f2); echo "$RUN_ID" | tee /tmp/{name}-run-id', # Terminate the cluster manually. (f'aws ec2 terminate-instances --region {region} --instance-ids $(' @@ -3054,8 +3154,10 @@ def test_managed_jobs_recovery_aws(aws_config_region): '--output text)'), _JOB_WAIT_NOT_RUNNING.format(job_name=name), f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RECOVERING"', - 'sleep 200', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.RUNNING], + timeout=200), f'RUN_ID=$(cat /tmp/{name}-run-id); echo "$RUN_ID"; sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID | grep "$RUN_ID"', ], f'sky jobs cancel -y -n {name}', @@ -3083,15 +3185,19 @@ def test_managed_jobs_recovery_gcp(): 'managed_jobs_recovery_gcp', [ f'sky jobs launch --cloud gcp --zone {zone} -n {name} --use-spot --cpus 2 "echo SKYPILOT_TASK_ID: \$SKYPILOT_TASK_ID; sleep 1800" -y -d', - 'sleep 360', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.RUNNING], + timeout=300), f'RUN_ID=$(sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID | cut -d: -f2); echo "$RUN_ID" | tee /tmp/{name}-run-id', # Terminate the cluster manually. terminate_cmd, _JOB_WAIT_NOT_RUNNING.format(job_name=name), f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RECOVERING"', - 'sleep 200', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.RUNNING], + timeout=200), f'RUN_ID=$(cat /tmp/{name}-run-id); echo "$RUN_ID"; sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID: | grep "$RUN_ID"', ], f'sky jobs cancel -y -n {name}', @@ -3114,8 +3220,10 @@ def test_managed_jobs_pipeline_recovery_aws(aws_config_region): 'managed_jobs_pipeline_recovery_aws', [ f'sky jobs launch -n {name} tests/test_yamls/pipeline_aws.yaml -y -d', - 'sleep 400', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.RUNNING], + timeout=400), f'RUN_ID=$(sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID: | cut -d: -f2); echo "$RUN_ID" | tee /tmp/{name}-run-id', f'RUN_IDS=$(sky jobs logs -n {name} --no-follow | grep -A 4 SKYPILOT_TASK_IDS | cut -d")" -f2); echo "$RUN_IDS" | tee /tmp/{name}-run-ids', # Terminate the cluster manually. @@ -3134,8 +3242,10 @@ def test_managed_jobs_pipeline_recovery_aws(aws_config_region): '--output text)'), _JOB_WAIT_NOT_RUNNING.format(job_name=name), f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RECOVERING"', - 'sleep 200', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.RUNNING], + timeout=200), f'RUN_ID=$(cat /tmp/{name}-run-id); echo $RUN_ID; sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID: | grep "$RUN_ID"', f'RUN_IDS=$(sky jobs logs -n {name} --no-follow | grep -A 4 SKYPILOT_TASK_IDS | cut -d")" -f2); echo "$RUN_IDS" | tee /tmp/{name}-run-ids-new', f'diff /tmp/{name}-run-ids /tmp/{name}-run-ids-new', @@ -3165,8 +3275,10 @@ def test_managed_jobs_pipeline_recovery_gcp(): 'managed_jobs_pipeline_recovery_gcp', [ f'sky jobs launch -n {name} tests/test_yamls/pipeline_gcp.yaml -y -d', - 'sleep 400', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.RUNNING], + timeout=400), f'RUN_ID=$(sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID: | cut -d: -f2); echo "$RUN_ID" | tee /tmp/{name}-run-id', f'RUN_IDS=$(sky jobs logs -n {name} --no-follow | grep -A 4 SKYPILOT_TASK_IDS | cut -d")" -f2); echo "$RUN_IDS" | tee /tmp/{name}-run-ids', # Terminate the cluster manually. @@ -3177,8 +3289,10 @@ def test_managed_jobs_pipeline_recovery_gcp(): f'cut -d\'_\' -f1 | rev | cut -d\'-\' -f1`; {terminate_cmd}'), _JOB_WAIT_NOT_RUNNING.format(job_name=name), f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RECOVERING"', - 'sleep 200', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.RUNNING], + timeout=200), f'RUN_ID=$(cat /tmp/{name}-run-id); echo $RUN_ID; sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID: | grep "$RUN_ID"', f'RUN_IDS=$(sky jobs logs -n {name} --no-follow | grep -A 4 SKYPILOT_TASK_IDS | cut -d")" -f2); echo "$RUN_IDS" | tee /tmp/{name}-run-ids-new', f'diff /tmp/{name}-run-ids /tmp/{name}-run-ids-new', @@ -3204,8 +3318,12 @@ def test_managed_jobs_recovery_default_resources(generic_cloud: str): 'managed-spot-recovery-default-resources', [ f'sky jobs launch -n {name} --cloud {generic_cloud} --use-spot "sleep 30 && sudo shutdown now && sleep 1000" -y -d', - 'sleep 360', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING\|RECOVERING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ + ManagedJobStatus.RUNNING, ManagedJobStatus.RECOVERING + ], + timeout=360), ], f'sky jobs cancel -y -n {name}', timeout=25 * 60, @@ -3225,8 +3343,10 @@ def test_managed_jobs_recovery_multi_node_aws(aws_config_region): 'managed_jobs_recovery_multi_node_aws', [ f'sky jobs launch --cloud aws --region {region} -n {name} --use-spot --num-nodes 2 "echo SKYPILOT_TASK_ID: \$SKYPILOT_TASK_ID; sleep 1800" -y -d', - 'sleep 450', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.RUNNING], + timeout=450), f'RUN_ID=$(sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID | cut -d: -f2); echo "$RUN_ID" | tee /tmp/{name}-run-id', # Terminate the worker manually. (f'aws ec2 terminate-instances --region {region} --instance-ids $(' @@ -3237,8 +3357,10 @@ def test_managed_jobs_recovery_multi_node_aws(aws_config_region): '--output text)'), _JOB_WAIT_NOT_RUNNING.format(job_name=name), f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RECOVERING"', - 'sleep 560', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.RUNNING], + timeout=560), f'RUN_ID=$(cat /tmp/{name}-run-id); echo $RUN_ID; sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID | cut -d: -f2 | grep "$RUN_ID"', ], f'sky jobs cancel -y -n {name}', @@ -3266,15 +3388,19 @@ def test_managed_jobs_recovery_multi_node_gcp(): 'managed_jobs_recovery_multi_node_gcp', [ f'sky jobs launch --cloud gcp --zone {zone} -n {name} --use-spot --num-nodes 2 "echo SKYPILOT_TASK_ID: \$SKYPILOT_TASK_ID; sleep 1800" -y -d', - 'sleep 400', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.RUNNING], + timeout=400), f'RUN_ID=$(sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID | cut -d: -f2); echo "$RUN_ID" | tee /tmp/{name}-run-id', # Terminate the worker manually. terminate_cmd, _JOB_WAIT_NOT_RUNNING.format(job_name=name), f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RECOVERING"', - 'sleep 420', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.RUNNING], + timeout=560), f'RUN_ID=$(cat /tmp/{name}-run-id); echo $RUN_ID; sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID | cut -d: -f2 | grep "$RUN_ID"', ], f'sky jobs cancel -y -n {name}', @@ -3299,13 +3425,17 @@ def test_managed_jobs_cancellation_aws(aws_config_region): [ # Test cancellation during spot cluster being launched. f'sky jobs launch --cloud aws --region {region} -n {name} --use-spot "sleep 1000" -y -d', - 'sleep 60', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "STARTING\|RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ + ManagedJobStatus.STARTING, ManagedJobStatus.RUNNING + ], + timeout=60 + _BUMP_UP_SECONDS), f'sky jobs cancel -y -n {name}', - 'sleep 5', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "CANCELLING\|CANCELLED"', - 'sleep 120', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "CANCELLED"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.CANCELLED], + timeout=120 + _BUMP_UP_SECONDS), (f's=$(aws ec2 describe-instances --region {region} ' f'--filters Name=tag:ray-cluster-name,Values={name_on_cloud}-* ' f'--query Reservations[].Instances[].State[].Name ' @@ -3313,12 +3443,16 @@ def test_managed_jobs_cancellation_aws(aws_config_region): ), # Test cancelling the spot cluster during spot job being setup. f'sky jobs launch --cloud aws --region {region} -n {name}-2 --use-spot tests/test_yamls/test_long_setup.yaml -y -d', - 'sleep 300', + # The job is set up in the cluster, will shown as RUNNING. + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=f'{name}-2', + job_status=[ManagedJobStatus.RUNNING], + timeout=300 + _BUMP_UP_SECONDS), f'sky jobs cancel -y -n {name}-2', - 'sleep 5', - f'{_GET_JOB_QUEUE} | grep {name}-2 | head -n1 | grep "CANCELLING\|CANCELLED"', - 'sleep 120', - f'{_GET_JOB_QUEUE} | grep {name}-2 | head -n1 | grep "CANCELLED"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=f'{name}-2', + job_status=[ManagedJobStatus.CANCELLED], + timeout=120 + _BUMP_UP_SECONDS), (f's=$(aws ec2 describe-instances --region {region} ' f'--filters Name=tag:ray-cluster-name,Values={name_2_on_cloud}-* ' f'--query Reservations[].Instances[].State[].Name ' @@ -3326,8 +3460,11 @@ def test_managed_jobs_cancellation_aws(aws_config_region): ), # Test cancellation during spot job is recovering. f'sky jobs launch --cloud aws --region {region} -n {name}-3 --use-spot "sleep 1000" -y -d', - 'sleep 300', - f'{_GET_JOB_QUEUE} | grep {name}-3 | head -n1 | grep "RUNNING"', + # The job is running in the cluster, will shown as RUNNING. + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=f'{name}-3', + job_status=[ManagedJobStatus.RUNNING], + timeout=300 + _BUMP_UP_SECONDS), # Terminate the cluster manually. (f'aws ec2 terminate-instances --region {region} --instance-ids $(' f'aws ec2 describe-instances --region {region} ' @@ -3337,10 +3474,10 @@ def test_managed_jobs_cancellation_aws(aws_config_region): _JOB_WAIT_NOT_RUNNING.format(job_name=f'{name}-3'), f'{_GET_JOB_QUEUE} | grep {name}-3 | head -n1 | grep "RECOVERING"', f'sky jobs cancel -y -n {name}-3', - 'sleep 5', - f'{_GET_JOB_QUEUE} | grep {name}-3 | head -n1 | grep "CANCELLING\|CANCELLED"', - 'sleep 120', - f'{_GET_JOB_QUEUE} | grep {name}-3 | head -n1 | grep "CANCELLED"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=f'{name}-3', + job_status=[ManagedJobStatus.CANCELLED], + timeout=120 + _BUMP_UP_SECONDS), # The cluster should be terminated (shutting-down) after cancellation. We don't use the `=` operator here because # there can be multiple VM with the same name due to the recovery. (f's=$(aws ec2 describe-instances --region {region} ' @@ -3375,34 +3512,42 @@ def test_managed_jobs_cancellation_gcp(): [ # Test cancellation during spot cluster being launched. f'sky jobs launch --cloud gcp --zone {zone} -n {name} --use-spot "sleep 1000" -y -d', - 'sleep 60', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "STARTING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.STARTING], + timeout=60 + _BUMP_UP_SECONDS), f'sky jobs cancel -y -n {name}', - 'sleep 5', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "CANCELLING\|CANCELLED"', - 'sleep 120', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "CANCELLED"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.CANCELLED], + timeout=120 + _BUMP_UP_SECONDS), # Test cancelling the spot cluster during spot job being setup. f'sky jobs launch --cloud gcp --zone {zone} -n {name}-2 --use-spot tests/test_yamls/test_long_setup.yaml -y -d', - 'sleep 300', + # The job is set up in the cluster, will shown as RUNNING. + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=f'{name}-2', + job_status=[ManagedJobStatus.RUNNING], + timeout=300 + _BUMP_UP_SECONDS), f'sky jobs cancel -y -n {name}-2', - 'sleep 5', - f'{_GET_JOB_QUEUE} | grep {name}-2 | head -n1 | grep "CANCELLING\|CANCELLED"', - 'sleep 120', - f'{_GET_JOB_QUEUE} | grep {name}-2 | head -n1 | grep "CANCELLED"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=f'{name}-2', + job_status=[ManagedJobStatus.CANCELLED], + timeout=120 + _BUMP_UP_SECONDS), # Test cancellation during spot job is recovering. f'sky jobs launch --cloud gcp --zone {zone} -n {name}-3 --use-spot "sleep 1000" -y -d', - 'sleep 300', - f'{_GET_JOB_QUEUE} | grep {name}-3 | head -n1 | grep "RUNNING"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=f'{name}-3', + job_status=[ManagedJobStatus.RUNNING], + timeout=300 + _BUMP_UP_SECONDS), # Terminate the cluster manually. terminate_cmd, _JOB_WAIT_NOT_RUNNING.format(job_name=f'{name}-3'), f'{_GET_JOB_QUEUE} | grep {name}-3 | head -n1 | grep "RECOVERING"', f'sky jobs cancel -y -n {name}-3', - 'sleep 5', - f'{_GET_JOB_QUEUE} | grep {name}-3 | head -n1 | grep "CANCELLING\|CANCELLED"', - 'sleep 120', - f'{_GET_JOB_QUEUE} | grep {name}-3 | head -n1 | grep "CANCELLED"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=f'{name}-3', + job_status=[ManagedJobStatus.CANCELLED], + timeout=120 + _BUMP_UP_SECONDS), # The cluster should be terminated (STOPPING) after cancellation. We don't use the `=` operator here because # there can be multiple VM with the same name due to the recovery. (f's=$({query_state_cmd}) && echo "$s" && echo; [[ -z "$s" ]] || echo "$s" | grep -v -E "PROVISIONING|STAGING|RUNNING|REPAIRING|TERMINATED|SUSPENDING|SUSPENDED|SUSPENDED"' @@ -3492,8 +3637,10 @@ def test_managed_jobs_storage(generic_cloud: str): *STORAGE_SETUP_COMMANDS, f'sky jobs launch -n {name}{use_spot} --cloud {generic_cloud}{region_flag} {file_path} -y', region_validation_cmd, # Check if the bucket is created in the correct region - 'sleep 60', # Wait the spot queue to be updated - f'{_GET_JOB_QUEUE} | grep {name} | grep SUCCEEDED', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.SUCCEEDED], + timeout=60 + _BUMP_UP_SECONDS), f'[ $(aws s3api list-buckets --query "Buckets[?contains(Name, \'{storage_name}\')].Name" --output text | wc -l) -eq 0 ]', # Check if file was written to the mounted output bucket output_check_cmd @@ -3517,10 +3664,17 @@ def test_managed_jobs_tpu(): 'test-spot-tpu', [ f'sky jobs launch -n {name} --use-spot examples/tpu/tpuvm_mnist.yaml -y -d', - 'sleep 5', - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep STARTING', - 'sleep 900', # TPU takes a while to launch - f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RUNNING\|SUCCEEDED"', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.STARTING], + timeout=60 + _BUMP_UP_SECONDS), + # TPU takes a while to launch + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ + ManagedJobStatus.RUNNING, ManagedJobStatus.SUCCEEDED + ], + timeout=900 + _BUMP_UP_SECONDS), ], f'sky jobs cancel -y -n {name}', # Increase timeout since sky jobs queue -r can be blocked by other spot tests. @@ -3538,8 +3692,10 @@ def test_managed_jobs_inline_env(generic_cloud: str): 'test-managed-jobs-inline-env', [ f'sky jobs launch -n {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_IPS}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_RANK}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NUM_NODES}\\" ]]) || exit 1"', - 'sleep 20', - f'{_GET_JOB_QUEUE} | grep {name} | grep SUCCEEDED', + _get_cmd_wait_until_managed_job_status_contains_matching_job_name( + job_name=name, + job_status=[ManagedJobStatus.SUCCEEDED], + timeout=20 + _BUMP_UP_SECONDS), ], f'sky jobs cancel -y -n {name}', # Increase timeout since sky jobs queue -r can be blocked by other spot tests. @@ -3646,8 +3802,10 @@ def test_azure_start_stop_two_nodes(): f'sky start -y {name} -i 1', f'sky exec --num-nodes=2 {name} examples/azure_start_stop.yaml', f'sky logs {name} 2 --status', # Ensure the job succeeded. - 'sleep 200', - f's=$(sky status -r {name}) && echo "$s" && echo "$s" | grep "INIT\|STOPPED"' + _get_cmd_wait_until_cluster_status_contains( + cluster_name=name, + cluster_status=[ClusterStatus.INIT, ClusterStatus.STOPPED], + timeout=200 + _BUMP_UP_SECONDS) + f'|| {{ ssh {name} "cat ~/.sky/skylet.log"; exit 1; }}' ], f'sky down -y {name}', @@ -4658,7 +4816,10 @@ def test_core_api_sky_launch_fast(generic_cloud: str): idle_minutes_to_autostop=1, fast=True) # Sleep to let the cluster autostop - time.sleep(120) + _get_cmd_wait_until_cluster_status_contains( + cluster_name=name, + cluster_status=[ClusterStatus.STOPPED], + timeout=120) # Run it again - should work with fast=True sky.launch(task, cluster_name=name,