Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

more fix of smoke tests #4548

Open
wants to merge 55 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
33bd131
fix release smoke tests
zpoint Dec 20, 2024
d787e21
robust backward compatibility
zpoint Dec 20, 2024
49beeec
bug fix
zpoint Dec 20, 2024
1f98e11
Merge branch 'master' into dev/zeping/fix_smoke_tests
zpoint Dec 25, 2024
b8c86ec
support gke mark
zpoint Dec 25, 2024
28ce35d
update retry field
zpoint Dec 27, 2024
1aa9e9d
Merge branch 'master' into dev/zeping/fix_smoke_tests
zpoint Dec 27, 2024
38a26bb
fix
zpoint Dec 27, 2024
e117e5e
fix smoke tests issue
zpoint Dec 30, 2024
caa0b86
update command of GCP
zpoint Dec 31, 2024
1a878bd
comment for azure fix
zpoint Dec 31, 2024
6fd05bb
merge master
zpoint Jan 1, 2025
30548d0
clear resources
zpoint Jan 1, 2025
5357036
remove legacy_credentials
zpoint Jan 2, 2025
3a51d3a
Merge branch 'master' into dev/zeping/fix_smoke_tests
zpoint Jan 2, 2025
9ac96b1
bug fix
zpoint Jan 2, 2025
81ec3b2
resolve PR comment
zpoint Jan 6, 2025
168093a
update comment
zpoint Jan 6, 2025
d53b849
text param for subprocess
zpoint Jan 6, 2025
1844001
longer timeout
zpoint Jan 8, 2025
758c31a
Merge branch 'master' into dev/zeping/fix_smoke_tests
zpoint Jan 8, 2025
3ead41b
rename to is_on_gcp
zpoint Jan 9, 2025
a85077d
merge master
zpoint Jan 9, 2025
744163f
revert initial_delay
zpoint Jan 9, 2025
017477d
cache based on boot time
zpoint Jan 9, 2025
e701aed
fix command
zpoint Jan 9, 2025
76dec93
Merge branch 'master' into dev/zeping/fix_smoke_tests
zpoint Jan 9, 2025
ad89aae
test_job_pipeline
zpoint Jan 9, 2025
56392fc
Merge branch 'master' into dev/zeping/fix_smoke_tests
zpoint Jan 10, 2025
ddcfb0a
fix bug
zpoint Jan 10, 2025
834536b
Merge branch 'dev/zeping/fix_smoke_tests' into dev/zeping/more_fix_of…
zpoint Jan 10, 2025
1ba1d06
separate different param to different step
zpoint Jan 12, 2025
8cd0740
fix bug
zpoint Jan 12, 2025
068f409
increase azure delay test
zpoint Jan 13, 2025
376259c
fix
zpoint Jan 13, 2025
94976c8
increase initial delay for azure
zpoint Jan 13, 2025
bfffac6
prevent check
zpoint Jan 13, 2025
33ebd38
skip certrain test
zpoint Jan 13, 2025
ef0a9d9
fix test_inferentia
zpoint Jan 13, 2025
5707b7b
merge master
zpoint Jan 14, 2025
60494e8
Revert "increase initial delay for azure"
zpoint Jan 17, 2025
2efcbb2
Revert "fix"
zpoint Jan 17, 2025
bde519e
Revert "increase azure delay test"
zpoint Jan 17, 2025
55ec13e
align with pytest
zpoint Jan 17, 2025
cfce6df
SKYPILOT_DISABLE_USAGE_COLLECTION
zpoint Jan 18, 2025
fecded8
Merge branch 'master' into dev/zeping/more_fix_of_smoke_tests
zpoint Jan 18, 2025
e59d3e2
rename to avoid run by pytest
zpoint Jan 20, 2025
91db4a2
filter cloud
zpoint Jan 20, 2025
956bf09
remove yaml
zpoint Jan 20, 2025
13de6d6
mypy
zpoint Jan 20, 2025
3b71853
verify
zpoint Jan 20, 2025
80c5d2a
support -k
zpoint Jan 20, 2025
1a1c285
generic cloud
zpoint Jan 20, 2025
878fd5d
comment
zpoint Jan 20, 2025
ba5dcce
remove
zpoint Jan 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 105 additions & 60 deletions .buildkite/generate_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
clouds are not supported yet, smoke tests for those clouds are not generated.
"""

import collections
import os
import random
import re
import subprocess
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple

import click
from conftest import cloud_to_pytest_keyword
Expand Down Expand Up @@ -59,13 +59,17 @@
'kubernetes': QUEUE_KUBERNETES
}

# Skip tests that we do not have resources for.
SKIP_TESTS = ['test_tpu_pod_slice_gke']

GENERATED_FILE_HEAD = ('# This is an auto-generated Buildkite pipeline by '
'.buildkite/generate_pipeline.py, Please do not '
'edit directly.\n')


def _extract_marked_tests(file_path: str,
filter_marks: List[str]) -> Dict[str, List[str]]:
def _extract_marked_tests(
file_path: str, args: str
) -> Dict[str, Tuple[List[str], List[str], List[Optional[str]]]]:
"""Extract test functions and filter clouds using pytest.mark
from a Python test file.

Expand All @@ -79,43 +83,79 @@ def _extract_marked_tests(file_path: str,
rerun failures. Additionally, the parallelism would be controlled by pytest
instead of the buildkite job queue.
"""
cmd = f'pytest {file_path} --collect-only'
cmd = f'pytest {file_path} --collect-only {args}'
output = subprocess.run(cmd, shell=True, capture_output=True, text=True)
matches = re.findall('Collected .+?\.py::(.+?) with marks: \[(.*?)\]',
output.stdout)
function_name_marks_map = {}

# If user pass --aws, --gcp, --azure, --kubernetes, we should change the
# DEFAULT_CLOUDS_TO_RUN to the clouds that user passed.
default_clouds_to_run = DEFAULT_CLOUDS_TO_RUN
args_to_pytest = args.split()
marks_to_pytest = [i[2:] for i in args_to_pytest if i.startswith('--')]
default_clouds_to_run = []
for each_arg in marks_to_pytest:
if each_arg in PYTEST_TO_CLOUD_KEYWORD:
default_clouds_to_run.append(each_arg)
if default_clouds_to_run:
default_clouds_to_run = list(
set(default_clouds_to_run) & set(CLOUD_QUEUE_MAP.keys()))
# if user pass clouds we don't support, we should revert back to default
if not default_clouds_to_run:
default_clouds_to_run = DEFAULT_CLOUDS_TO_RUN

k_value = None
# Iterate over the argument list to find `-k` and its value
for i, arg in enumerate(args_to_pytest):
if arg == "-k" and i + 1 < len(args_to_pytest):
k_value = args_to_pytest[i + 1]
if arg == "--generic-cloud" and i + 1 < len(args_to_pytest):
default_clouds_to_run = [args_to_pytest[i + 1]]

print(f'default_clouds_to_run: {default_clouds_to_run}')
function_name_marks_map = collections.defaultdict(set)
function_name_param_map = collections.defaultdict(list)

for function_name, marks in matches:
function_name = re.sub(r'\[.*?\]', '', function_name)
clean_function_name = re.sub(r'\[.*?\]', '', function_name)
if clean_function_name in SKIP_TESTS:
continue
if 'skip' in marks:
continue
if k_value is not None and k_value not in function_name:
# TODO(zpoint): support and/or in k_value
continue

marks = marks.replace('\'', '').split(',')
marks = [i.strip() for i in marks]
if function_name not in function_name_marks_map:
function_name_marks_map[function_name] = set(marks)
else:
function_name_marks_map[function_name].update(marks)

function_name_marks_map[clean_function_name].update(marks)

# extract parameter from function name
# example: test_skyserve_new_autoscaler_update[rolling]
# param: rolling
# function_name: test_skyserve_new_autoscaler_update
param = None
if '[' in function_name and 'serve' in marks:
# Only serve tests are slow and flaky, so we separate them
# to different steps for parallel execution
param = re.search('\[(.+?)\]', function_name).group(1)
if param:
function_name_param_map[clean_function_name].append(param)

function_cloud_map = {}
filter_marks = set(filter_marks)
for function_name, marks in function_name_marks_map.items():
if filter_marks and not filter_marks & marks:
continue
clouds_to_include = []
clouds_to_exclude = []
is_serve_test = 'serve' in marks
run_on_gke = 'requires_gke' in marks
for mark in marks:
if mark.startswith('no_'):
clouds_to_exclude.append(mark[3:])
else:
if mark not in PYTEST_TO_CLOUD_KEYWORD:
# This mark does not specify a cloud, so we skip it.
continue
clouds_to_include.append(PYTEST_TO_CLOUD_KEYWORD[mark])
if mark not in PYTEST_TO_CLOUD_KEYWORD:
# This mark does not specify a cloud, so we skip it.
continue
clouds_to_include.append(PYTEST_TO_CLOUD_KEYWORD[mark])

clouds_to_include = (clouds_to_include
if clouds_to_include else DEFAULT_CLOUDS_TO_RUN)
clouds_to_include = [
cloud for cloud in clouds_to_include
if cloud not in clouds_to_exclude
]
if clouds_to_include else default_clouds_to_run)
cloud_queue_map = SERVE_CLOUD_QUEUE_MAP if is_serve_test else CLOUD_QUEUE_MAP
final_clouds_to_include = [
cloud for cloud in clouds_to_include if cloud in cloud_queue_map
Expand All @@ -132,31 +172,42 @@ def _extract_marked_tests(file_path: str,
f'Warning: {function_name} is marked to run on {clouds_to_include}, '
f'but we only have credentials for {final_clouds_to_include}. '
f'clouds {excluded_clouds} are skipped.')

param_list = function_name_param_map.get(function_name, [None])
if len(param_list) < len(final_clouds_to_include):
# align, so we can zip them together
param_list += [None
] * (len(final_clouds_to_include) - len(param_list))
function_cloud_map[function_name] = (final_clouds_to_include, [
QUEUE_GKE if run_on_gke else cloud_queue_map[cloud]
for cloud in final_clouds_to_include
])
], param_list)

return function_cloud_map


def _generate_pipeline(test_file: str,
filter_marks: List[str],
args: str,
auto_retry: bool = False) -> Dict[str, Any]:
"""Generate a Buildkite pipeline from test files."""
steps = []
function_cloud_map = _extract_marked_tests(test_file, filter_marks)
for test_function, clouds_and_queues in function_cloud_map.items():
for cloud, queue in zip(*clouds_and_queues):
function_cloud_map = _extract_marked_tests(test_file, args)
for test_function, clouds_queues_param in function_cloud_map.items():
for cloud, queue, param in zip(*clouds_queues_param):
label = f'{test_function} on {cloud}'
command = f'pytest {test_file}::{test_function} --{cloud}'
if param:
label += f' with param {param}'
command += f' -k {param}'
step = {
'label': f'{test_function} on {cloud}',
'command': f'pytest {test_file}::{test_function} --{cloud}',
'label': label,
'command': command,
'agents': {
# Separate agent pool for each cloud.
# Since they require different amount of resources and
# concurrency control.
'queue': queue
},
'if': f'build.env("{cloud}") == "1"'
}
}
if auto_retry:
step['retry'] = {
Expand All @@ -170,44 +221,43 @@ def _generate_pipeline(test_file: str,
def _dump_pipeline_to_file(yaml_file_path: str,
pipelines: List[Dict[str, Any]],
extra_env: Optional[Dict[str, str]] = None):
default_env = {'LOG_TO_STDOUT': '1', 'PYTHONPATH': '${PYTHONPATH}:$(pwd)'}
default_env = {
'LOG_TO_STDOUT': '1',
'PYTHONPATH': '${PYTHONPATH}:$(pwd)',
'SKYPILOT_DISABLE_USAGE_COLLECTION': '1'
}
if extra_env:
default_env.update(extra_env)
with open(yaml_file_path, 'w', encoding='utf-8') as file:
file.write(GENERATED_FILE_HEAD)
all_steps = []
for pipeline in pipelines:
all_steps.extend(pipeline['steps'])
# Shuffle the steps to avoid flakyness, consecutive runs of the same
# kind of test may fail for requiring locks on the same resources.
random.shuffle(all_steps)
final_pipeline = {'steps': all_steps, 'env': default_env}
yaml.dump(final_pipeline, file, default_flow_style=False)


def _convert_release(test_files: List[str], filter_marks: List[str]):
def _convert_release(test_files: List[str], args: str):
yaml_file_path = '.buildkite/pipeline_smoke_tests_release.yaml'
output_file_pipelines = []
for test_file in test_files:
print(f'Converting {test_file} to {yaml_file_path}')
pipeline = _generate_pipeline(test_file, filter_marks, auto_retry=True)
pipeline = _generate_pipeline(test_file, args, auto_retry=True)
output_file_pipelines.append(pipeline)
print(f'Converted {test_file} to {yaml_file_path}\n\n')
# Enable all clouds by default for release pipeline.
_dump_pipeline_to_file(yaml_file_path,
output_file_pipelines,
extra_env={cloud: '1' for cloud in CLOUD_QUEUE_MAP})
_dump_pipeline_to_file(yaml_file_path, output_file_pipelines)


def _convert_quick_tests_core(test_files: List[str], filter_marks: List[str]):
def _convert_quick_tests_core(test_files: List[str], args: List[str]):
yaml_file_path = '.buildkite/pipeline_smoke_tests_quick_tests_core.yaml'
output_file_pipelines = []
for test_file in test_files:
print(f'Converting {test_file} to {yaml_file_path}')
# We want enable all clouds by default for each test function
# for pre-merge. And let the author controls which clouds
# to run by parameter.
pipeline = _generate_pipeline(test_file, filter_marks)
pipeline = _generate_pipeline(test_file, args)
pipeline['steps'].append({
'label': 'Backward compatibility test',
'command': 'bash tests/backward_compatibility_tests.sh',
Expand All @@ -223,11 +273,10 @@ def _convert_quick_tests_core(test_files: List[str], filter_marks: List[str]):


@click.command()
@click.option(
'--filter-marks',
type=str,
help='Filter to include only a subset of pytest marks, e.g., managed_jobs')
def main(filter_marks):
@click.option('--args',
type=str,
help='Args to pass to pytest, e.g., --managed-jobs --aws')
def main(args):
test_files = os.listdir('tests/smoke_tests')
release_files = []
quick_tests_core_files = []
Expand All @@ -240,15 +289,11 @@ def main(filter_marks):
else:
release_files.append(test_file_path)

filter_marks = filter_marks or os.getenv('FILTER_MARKS')
if filter_marks:
filter_marks = filter_marks.split(',')
print(f'Filter marks: {filter_marks}')
else:
filter_marks = []
args = args or os.getenv('ARGS', '')
print(f'args: {args}')

_convert_release(release_files, filter_marks)
_convert_quick_tests_core(quick_tests_core_files, filter_marks)
_convert_release(release_files, args)
_convert_quick_tests_core(quick_tests_core_files, args)


if __name__ == '__main__':
Expand Down
12 changes: 6 additions & 6 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,6 @@ def _get_cloud_to_run(config) -> List[str]:


def pytest_collection_modifyitems(config, items):
if config.option.collectonly:
for item in items:
full_name = item.nodeid
marks = [mark.name for mark in item.iter_markers()]
print(f"Collected {full_name} with marks: {marks}")

skip_marks = {}
skip_marks['slow'] = pytest.mark.skip(reason='need --runslow option to run')
skip_marks['managed_jobs'] = pytest.mark.skip(
Expand Down Expand Up @@ -191,6 +185,12 @@ def pytest_collection_modifyitems(config, items):
item.add_marker(serial_mark)
item._nodeid = f'{item.nodeid}@serial_{generic_cloud_keyword}' # See comment on item.nodeid above

if config.option.collectonly:
for item in items:
full_name = item.nodeid
marks = [mark.name for mark in item.iter_markers()]
print(f"Collected {full_name} with marks: {marks}")


def _is_generic_test(item) -> bool:
for cloud in all_clouds_in_smoke_tests:
Expand Down
4 changes: 2 additions & 2 deletions tests/smoke_tests/test_cluster_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,8 @@ def test_inferentia():
test = smoke_tests_utils.Test(
'test_inferentia',
[
f'sky launch -y -c {name} -t inf2.xlarge -- echo hi',
f'sky exec {name} --gpus Inferentia:1 echo hi',
f'sky launch -y -c {name} -t inf2.xlarge --cloud aws -- echo hi',
f'sky exec {name} --gpus Inferentia2:1 echo hi',
f'sky logs {name} 1 --status', # Ensure the job succeeded.
f'sky logs {name} 2 --status', # Ensure the job succeeded.
],
Expand Down
24 changes: 13 additions & 11 deletions tests/smoke_tests/test_managed_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@
#
# Change cloud for generic tests to aws
# > pytest tests/smoke_tests/test_managed_job.py --generic-cloud aws

import pathlib
import re
import tempfile
import time

import pytest
from smoke_tests import smoke_tests_utils
from smoke_tests.test_mount_and_storage import TestStorageWithCredentials

# isort: off
from smoke_tests.test_mount_and_storage import (TestStorageWithCredentials as
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename it so that it does not start with 'test' and will not be triggered twice.

StorageWithCredentialsUtils)

import sky
from sky import jobs
Expand Down Expand Up @@ -103,7 +105,7 @@ def test_job_pipeline(generic_cloud: str):
test = smoke_tests_utils.Test(
'spot-pipeline',
[
f'sky jobs launch -n {name} tests/test_yamls/pipeline.yaml -y -d',
f'sky jobs launch -n {name} tests/test_yamls/pipeline.yaml --cloud {generic_cloud} -y -d',
'sleep 5',
f'{smoke_tests_utils.GET_JOB_QUEUE} | grep {name} | head -n1 | grep "STARTING\|RUNNING"',
# `grep -A 4 {name}` finds the job with {name} and the 4 lines
Expand Down Expand Up @@ -709,31 +711,31 @@ def test_managed_jobs_storage(generic_cloud: str):
if generic_cloud == 'aws':
region = 'eu-central-1'
region_flag = f' --region {region}'
region_cmd = TestStorageWithCredentials.cli_region_cmd(
region_cmd = StorageWithCredentialsUtils.cli_region_cmd(
storage_lib.StoreType.S3, bucket_name=storage_name)
region_validation_cmd = f'{region_cmd} | grep {region}'
s3_check_file_count = TestStorageWithCredentials.cli_count_name_in_bucket(
s3_check_file_count = StorageWithCredentialsUtils.cli_count_name_in_bucket(
storage_lib.StoreType.S3, output_storage_name, 'output.txt')
output_check_cmd = f'{s3_check_file_count} | grep 1'
elif generic_cloud == 'gcp':
region = 'us-west2'
region_flag = f' --region {region}'
region_cmd = TestStorageWithCredentials.cli_region_cmd(
region_cmd = StorageWithCredentialsUtils.cli_region_cmd(
storage_lib.StoreType.GCS, bucket_name=storage_name)
region_validation_cmd = f'{region_cmd} | grep {region}'
gcs_check_file_count = TestStorageWithCredentials.cli_count_name_in_bucket(
gcs_check_file_count = StorageWithCredentialsUtils.cli_count_name_in_bucket(
storage_lib.StoreType.GCS, output_storage_name, 'output.txt')
output_check_cmd = f'{gcs_check_file_count} | grep 1'
elif generic_cloud == 'azure':
region = 'centralus'
region_flag = f' --region {region}'
storage_account_name = (
storage_lib.AzureBlobStore.get_default_storage_account_name(region))
region_cmd = TestStorageWithCredentials.cli_region_cmd(
region_cmd = StorageWithCredentialsUtils.cli_region_cmd(
storage_lib.StoreType.AZURE,
storage_account_name=storage_account_name)
region_validation_cmd = f'{region_cmd} | grep {region}'
az_check_file_count = TestStorageWithCredentials.cli_count_name_in_bucket(
az_check_file_count = StorageWithCredentialsUtils.cli_count_name_in_bucket(
storage_lib.StoreType.AZURE,
output_storage_name,
'output.txt',
Expand All @@ -742,10 +744,10 @@ def test_managed_jobs_storage(generic_cloud: str):
elif generic_cloud == 'kubernetes':
# With Kubernetes, we don't know which object storage provider is used.
# Check both S3 and GCS if bucket exists in either.
s3_check_file_count = TestStorageWithCredentials.cli_count_name_in_bucket(
s3_check_file_count = StorageWithCredentialsUtils.cli_count_name_in_bucket(
storage_lib.StoreType.S3, output_storage_name, 'output.txt')
s3_output_check_cmd = f'{s3_check_file_count} | grep 1'
gcs_check_file_count = TestStorageWithCredentials.cli_count_name_in_bucket(
gcs_check_file_count = StorageWithCredentialsUtils.cli_count_name_in_bucket(
storage_lib.StoreType.GCS, output_storage_name, 'output.txt')
gcs_output_check_cmd = f'{gcs_check_file_count} | grep 1'
output_check_cmd = f'{s3_output_check_cmd} || {gcs_output_check_cmd}'
Expand Down
Loading