diff --git a/.gitignore b/.gitignore index 31afa021418..71268bbfc71 100644 --- a/.gitignore +++ b/.gitignore @@ -30,4 +30,5 @@ main.js.map .idea .coverage -.coverage.* \ No newline at end of file +.coverage.* +pytest-logs \ No newline at end of file diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index e1d84ed9f74..97fe4edcdb8 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -72,7 +72,7 @@ stages: ${BUILT_IMAGE_FULL_PATH} bash -c " cd /home/zservice/metaflow/metaflow/plugins/kfp/tests && - python -m pytest -s -n 3 run_integration_tests.py --image ${BUILT_IMAGE_FULL_PATH} --opsgenie-api-token ${OPSGENIE_API_TOKEN} --cov-config=setup.cfg + python -m pytest -s -n 3 run_integration_tests.py --image ${BUILT_IMAGE_FULL_PATH} --opsgenie-api-token ${OPSGENIE_API_TOKEN} --cov-config=setup.cfg --public-dir /home/zservice/public " artifacts: when: always diff --git a/metaflow/plugins/kfp/kfp_cli.py b/metaflow/plugins/kfp/kfp_cli.py index 9d7e79f3c69..dfc89107446 100644 --- a/metaflow/plugins/kfp/kfp_cli.py +++ b/metaflow/plugins/kfp/kfp_cli.py @@ -1,6 +1,7 @@ import json import shutil import subprocess +import logging from metaflow import JSONType, current, decorators, parameters from metaflow._vendor import click @@ -225,6 +226,13 @@ def step_init(obj, run_id, step_name, passed_in_split_indexes, task_id): "If not set, METAFLOW_NOTIFY_ON_SUCCESS is used from Metaflow config or environment variable", show_default=True, ) +@click.option( + "--verbose/--no-verbose", + "verbose", + default=False, + help="Turns on debug logging.", + show_default=True, +) @click.pass_obj def run( obj, @@ -248,12 +256,18 @@ def run( notify_on_error=None, notify_on_success=None, argo_wait=False, + verbose=False, **kwargs, ): """ Analogous to step_functions_cli.py """ + if verbose: + # Turn on debugging for the root logger so in particular we can see logs emitted by the KFP + # SDK as that by default emits there. + logging.basicConfig(level=logging.DEBUG) + def _convert_value(param: parameters.Parameter): v = kwargs.get(param.name) return json.dumps(v) if param.kwargs.get("type") == JSONType else v diff --git a/metaflow/plugins/kfp/tests/conftest.py b/metaflow/plugins/kfp/tests/conftest.py index f19753afd1e..a10e08075d7 100644 --- a/metaflow/plugins/kfp/tests/conftest.py +++ b/metaflow/plugins/kfp/tests/conftest.py @@ -1,3 +1,14 @@ +import logging +import sys +from pathlib import Path +from io import TextIOBase +import pytest + + +# Use the root logger for stdout/stderr patching. +logger = logging.getLogger() + + def pytest_addoption(parser): """ The image on Artifactory that corresponds to the currently @@ -7,3 +18,46 @@ def pytest_addoption(parser): parser.addoption( "--opsgenie-api-token", dest="opsgenie_api_token", action="store", default=None ) + parser.addoption("--public-dir", dest="public_dir", action="store", default="") + + +class StreamToLogger(TextIOBase): + """Fake file-like stream object that redirects writes to a logger instance.""" + + def __init__(self, logger, level, original_stream): + self.logger = logger + self.level = level + self.linebuf = "" + self.original_stream = original_stream + + def write(self, buf): + for line in buf.rstrip().splitlines(): + self.logger.log(self.level, line.rstrip()) + else: + # Additionally ensure we also write back to the original stream too! + self.original_stream.write(buf) + + def flush(self): + pass + + +@pytest.hookimpl(hookwrapper=True, tryfirst=True) +def pytest_runtest_setup(item): + """Emit a log file per test to make it easier to debug in failure scenarios. + + Sourced from: https://stackoverflow.com/a/64480499 + """ + logging_plugin = item.config.pluginmanager.get_plugin("logging-plugin") + filename = Path( + item.config.getoption("public_dir"), + "pytest-logs", + f"{item._request.node.name}.log", + ) + logging_plugin.set_log_path(str(filename)) + + # Forward logs from stdout/stderr to the logger as well. + if not isinstance(sys.stdout, StreamToLogger): + sys.stdout = StreamToLogger(logger, logging.INFO, sys.stdout) + if not isinstance(sys.stderr, StreamToLogger): + sys.stderr = StreamToLogger(logger, logging.INFO, sys.stderr) + yield diff --git a/metaflow/plugins/kfp/tests/run_integration_tests.py b/metaflow/plugins/kfp/tests/run_integration_tests.py index e06cd5fe60f..5212f6a01a1 100644 --- a/metaflow/plugins/kfp/tests/run_integration_tests.py +++ b/metaflow/plugins/kfp/tests/run_integration_tests.py @@ -58,6 +58,8 @@ def _python(): def obtain_flow_file_paths(flow_dir_path: str) -> List[str]: + # TODO AIP-6643 Revert this + return ['nested_foreach_with_branching.py'] file_paths: List[str] = [ file_name for file_name in listdir(flow_dir_path) @@ -68,6 +70,7 @@ def obtain_flow_file_paths(flow_dir_path: str) -> List[str]: return file_paths +@pytest.mark.skip def test_s3_sensor_flow(pytestconfig) -> None: # ensure the s3_sensor waits for some time before the key exists file_name: str = f"s3-sensor-file-{uuid.uuid1()}.txt" @@ -131,6 +134,7 @@ def test_s3_sensor_flow(pytestconfig) -> None: # This test ensures that a flow fails correctly, # and when it fails, an OpsGenie email is sent. +@pytest.mark.skip def test_error_and_opsgenie_alert(pytestconfig) -> None: raise_error_flow_cmd: str = ( f"{_python()} flows/raise_error_flow.py --datastore=s3 kfp run " @@ -210,7 +214,7 @@ def test_flows(pytestconfig, flow_file_path: str) -> None: test_cmd: str = ( f"{_python()} {full_path} --datastore=s3 --with retry kfp run " - f"--wait-for-completion --workflow-timeout 1800 " + f"--wait-for-completion --workflow-timeout 1800 --verbose " f"--max-parallelism 3 --experiment metaflow_test --tag test_t1 " f"--sys-tag test_sys_t1:sys_tag_value " ) @@ -232,7 +236,8 @@ def run_cmd_with_backoff_from_platform_errors( # as well as output to stdout and stderr (which users can see on the Gitlab logs). We check # if the error message is due to a KFAM issue, and if so, we do an exponential backoff. - backoff_intervals_in_seconds: List[int] = [0, 2, 4, 8, 16, 32] + # TODO AIP-6643 Limit retries so we get all the logs + backoff_intervals_in_seconds: List[int] = [0, 2, 4] # , 8, 16, 32] platform_error_messages: List[str] = [ "Reason: Unauthorized", @@ -316,6 +321,7 @@ def get_compiled_yaml(compile_to_yaml_cmd, yaml_file_path) -> Dict[str, str]: return flow_yaml +@pytest.mark.skip def test_kubernetes_service_account_compile_only() -> None: service_account = "test-service-account" with tempfile.TemporaryDirectory() as yaml_tmp_dir: @@ -332,6 +338,7 @@ def test_kubernetes_service_account_compile_only() -> None: assert flow_yaml["spec"]["serviceAccountName"] == service_account +@pytest.mark.skip def test_toleration_and_affinity_compile_only() -> None: step_templates: Dict[str, str] = {} with tempfile.TemporaryDirectory() as yaml_tmp_dir: diff --git a/metaflow/plugins/kfp/tests/setup.cfg b/metaflow/plugins/kfp/tests/setup.cfg index ff518713bf1..3f0f17e2ccb 100644 --- a/metaflow/plugins/kfp/tests/setup.cfg +++ b/metaflow/plugins/kfp/tests/setup.cfg @@ -13,7 +13,8 @@ exclude_lines = # pytest [tool:pytest] -addopts = -vv --cov=/home/zservice/metaflow/metaflow/plugins/kfp --cov-report term --cov-report html +addopts = -vv --cov=/home/zservice/metaflow/metaflow/plugins/kfp --cov-report term --cov-report html --log-file-level debug --log-cli-level info --log-file-format "%(asctime)s %(threadName)s %(filename)s:%(lineno)d %(levelname)s - %(message)s" --log-file-date-format "%Y-%m-%d %H:%M:%S" + [html] -directory = /home/zservice/public +directory = /home/zservice/public/coverage