Skip to content

Commit

Permalink
Writing log messages when MLCube fails to validate runner configurati…
Browse files Browse the repository at this point in the history
…on. (#330)
  • Loading branch information
sergey-serebryakov authored Aug 2, 2023
1 parent 0f62e81 commit 1955fc9
Showing 1 changed file with 97 additions and 50 deletions.
147 changes: 97 additions & 50 deletions mlcube/mlcube/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,22 @@
import os
import typing as t

from mlcube.runner import Runner

from omegaconf import (DictConfig, OmegaConf)
from omegaconf import DictConfig, OmegaConf

from mlcube.runner import Runner

logger = logging.getLogger(__name__)

__all__ = ['IOType', 'ParameterType', 'MountType', 'MLCubeConfig']
__all__ = ["IOType", "ParameterType", "MountType", "MLCubeConfig"]


class IOType(object):
"""Input/output type of MLCube task parameter."""

INPUT = 'input'
INPUT = "input"
"""This parameter is input parameter (e.g., path to data)."""

OUTPUT = 'output'
OUTPUT = "output"
"""This parameter is output parameter (e.g., path to trained model)."""

@staticmethod
Expand All @@ -36,28 +35,32 @@ def is_valid(io: str) -> bool:
class ParameterType(object):
"""Type of MLCube task parameter."""

FILE = 'file'
FILE = "file"
"""This parameter is a file."""

DIRECTORY = 'directory'
DIRECTORY = "directory"
"""This parameter is a directory."""

UNKNOWN = 'unknown'
UNKNOWN = "unknown"
"""Type is unknown (only used internally)."""

@staticmethod
def is_valid(io: str) -> bool:
"""Return true if string `io` contain valid parameter type."""
return io in (ParameterType.FILE, ParameterType.DIRECTORY, ParameterType.UNKNOWN)
return io in (
ParameterType.FILE,
ParameterType.DIRECTORY,
ParameterType.UNKNOWN,
)


class MountType(object):
"""Read-Write (rw) or Read-Only type of MLCube mount parameter."""

RW = 'rw'
RW = "rw"
"""This parameter is reads-write parameter (e.g., path to data)."""

RO = 'ro'
RO = "ro"
"""This parameter is read-only parameter"""

@staticmethod
Expand All @@ -70,7 +73,9 @@ class MLCubeConfig(object):
"""Utilities to assemble effective MLCube configuration."""

@staticmethod
def ensure_values_exist(config: DictConfig, keys: t.Union[str, t.List], constructor: t.Callable) -> t.List:
def ensure_values_exist(
config: DictConfig, keys: t.Union[str, t.List], constructor: t.Callable
) -> t.List:
"""Make sure the `config` dictionary contains specified keys.
Args:
Expand All @@ -88,15 +93,20 @@ def ensure_values_exist(config: DictConfig, keys: t.Union[str, t.List], construc
@staticmethod
def get_uri(value: str) -> str:
"""Validate `value` is a valid URI."""
if value.startswith('storage:'):
if value.startswith("storage:"):
raise ValueError("Storage schema is not yet supported")
return os.path.abspath(os.path.expanduser(value))

@staticmethod
def create_mlcube_config(mlcube_config_file: str, mlcube_cli_args: t.Optional[DictConfig] = None,
task_cli_args: t.Optional[t.Dict] = None, runner_config: t.Optional[DictConfig] = None,
workspace: t.Optional[str] = None, resolve: bool = True,
runner_cls: t.Optional[t.Type[Runner]] = None) -> DictConfig:
def create_mlcube_config(
mlcube_config_file: str,
mlcube_cli_args: t.Optional[DictConfig] = None,
task_cli_args: t.Optional[t.Dict] = None,
runner_config: t.Optional[DictConfig] = None,
workspace: t.Optional[str] = None,
resolve: bool = True,
runner_cls: t.Optional[t.Type[Runner]] = None,
) -> DictConfig:
"""Create MLCube configuration merging different configs - base, global, local and cli.
Args:
Expand All @@ -117,7 +127,11 @@ def create_mlcube_config(mlcube_config_file: str, mlcube_cli_args: t.Optional[Di
logger.debug(
"MLCubeConfig.create_mlcube_config input_arg mlcube_config_file=%s, mlcube_cli_args=%s, task_cli_args=%s, "
"runner_config=%s, workspace=%s",
mlcube_config_file, mlcube_cli_args, task_cli_args, runner_config, workspace
mlcube_config_file,
mlcube_cli_args,
task_cli_args,
runner_config,
workspace,
)
if mlcube_cli_args is None:
mlcube_cli_args = OmegaConf.create({})
Expand All @@ -127,23 +141,29 @@ def create_mlcube_config(mlcube_config_file: str, mlcube_cli_args: t.Optional[Di
runner_config = OmegaConf.create({})

# Load MLCube configuration and maybe override parameters from command line (like -Pdocker.build_strategy=...).
actual_workspace = '${runtime.root}/workspace' if workspace is None else MLCubeConfig.get_uri(workspace)
actual_workspace = (
"${runtime.root}/workspace"
if workspace is None
else MLCubeConfig.get_uri(workspace)
)
mlcube_config = OmegaConf.merge(
OmegaConf.load(mlcube_config_file), # MLCube configuration file.
mlcube_cli_args, # MLCube parameters from command line.
OmegaConf.create({ # Section defining runtime parameters.
'runtime': {
'root': os.path.dirname(mlcube_config_file),
'workspace': actual_workspace
},
'runner': runner_config # Effective (final) runner configuration.
})
OmegaConf.load(mlcube_config_file), # MLCube configuration file.
mlcube_cli_args, # MLCube parameters from command line.
OmegaConf.create(
{ # Section defining runtime parameters.
"runtime": {
"root": os.path.dirname(mlcube_config_file),
"workspace": actual_workspace,
},
"runner": runner_config, # Effective (final) runner configuration.
}
),
)
# Maybe this is not the best idea, but originally MLCube used $WORKSPACE token to refer to the internal
# workspace. So, this value is here to simplify access to workspace value. BTW, in general, if files are to be
# located inside workspace (internal or custom), users are encouraged not to use ${runtime.workspace} or
# ${workspace} in their MLCube configuration files.
mlcube_config['workspace'] = actual_workspace
mlcube_config["workspace"] = actual_workspace
# Merge, for instance, docker runner config from system settings with docker config from MLCube config.
if runner_cls:
# Make sure all default parameters are present - this can be done automatically for all runners (so that
Expand All @@ -155,18 +175,31 @@ def create_mlcube_config(mlcube_config_file: str, mlcube_cli_args: t.Optional[Di
# Need to apply CLI arguments again just in case users provided something like -Prunner.build_strategy=...
mlcube_config = OmegaConf.merge(mlcube_config, mlcube_cli_args)
if runner_cls:
runner_cls.CONFIG.validate(mlcube_config)
try:
runner_cls.CONFIG.validate(mlcube_config)
except Exception as err:
logger.error(
"MLCubeConfig.create_mlcube_config failed to validate MLCube config (%s): %s.",
OmegaConf.to_container(mlcube_config, resolve=False),
str(err),
)
raise

for task_name in mlcube_config.tasks.keys():
[task] = MLCubeConfig.ensure_values_exist(mlcube_config.tasks, task_name, dict)
if 'entrypoint' in task and task['entrypoint'] is None:
[task] = MLCubeConfig.ensure_values_exist(
mlcube_config.tasks, task_name, dict
)
if "entrypoint" in task and task["entrypoint"] is None:
logger.warning(
"MLCube task (%s) specifies an entrypoint that is None: removing it (a default "
"entrypoint will be used).", task_name
"entrypoint will be used).",
task_name,
)
task.pop('entrypoint')
[parameters] = MLCubeConfig.ensure_values_exist(task, 'parameters', dict)
[inputs, outputs] = MLCubeConfig.ensure_values_exist(parameters, ['inputs', 'outputs'], dict)
task.pop("entrypoint")
[parameters] = MLCubeConfig.ensure_values_exist(task, "parameters", dict)
[inputs, outputs] = MLCubeConfig.ensure_values_exist(
parameters, ["inputs", "outputs"], dict
)

MLCubeConfig.check_parameters(inputs, task_cli_args)
MLCubeConfig.check_parameters(outputs, task_cli_args)
Expand All @@ -176,7 +209,9 @@ def create_mlcube_config(mlcube_config_file: str, mlcube_cli_args: t.Optional[Di
return mlcube_config

@staticmethod
def merge_with_logging(mlcube_config: DictConfig, default_runner_config: DictConfig) -> None:
def merge_with_logging(
mlcube_config: DictConfig, default_runner_config: DictConfig
) -> None:
"""Merge default runner config with current effective runner config.
The goal is to make sure the effective configuration contains all parameters accepted by the runner so that this
Expand All @@ -186,17 +221,19 @@ def merge_with_logging(mlcube_config: DictConfig, default_runner_config: DictCon
mlcube_config: Current effective MLCube configuration.
default_runner_config: Default runner configuration.
"""
params_to_merge = [k for k in default_runner_config.keys() if k not in mlcube_config['runner']]
params_to_merge = [
k for k in default_runner_config.keys() if k not in mlcube_config["runner"]
]
if params_to_merge:
logger.warning(
"Default runner config contains parameters that are not present in the effective runner config "
"(params=%s). This probably means that a new version of a runner was installed that introduced "
"new parameters.",
str(params_to_merge)
str(params_to_merge),
)
current_effective_cfg = mlcube_config['runner']
current_effective_cfg = mlcube_config["runner"]
mlcube_config["runner"] = default_runner_config.copy()
mlcube_config.merge_with({'runner': current_effective_cfg})
mlcube_config.merge_with({"runner": current_effective_cfg})

@staticmethod
def check_parameters(parameters: DictConfig, task_cli_args: t.Dict) -> None:
Expand All @@ -211,30 +248,38 @@ def check_parameters(parameters: DictConfig, task_cli_args: t.Dict) -> None:
# a path separator at the end of the parameter value as a hint that the type of this parameter is a directory
# (when not specified by a user). We should not relly on `os.sep` since MLCubes are expected to run in different
# environments (e.g., Unix and Windows).
separators = ('/', '\\')
separators = ("/", "\\")
if os.sep not in separators:
logger.warning("The os-specific path separator ('%s') not in list of standard separators.", os.sep)
logger.warning(
"The os-specific path separator ('%s') not in list of standard separators.",
os.sep,
)
if os.altsep is not None and os.altsep not in separators:
logger.warning(
"The os-specific alternative path separator ('%s') not in list of standard separators.", os.altsep
"The os-specific alternative path separator ('%s') not in list of standard separators.",
os.altsep,
)
#
for name in parameters.keys():
# The `_param_name` is anyway there, so check it's not None.
[param_def] = MLCubeConfig.ensure_values_exist(parameters, name, dict)
# Deal with the case when value is a string (default value).
if isinstance(param_def, str):
parameters[name] = {'default': param_def}
parameters[name] = {"default": param_def}
param_def = parameters[name]
# If `default` key is not present, use parameter name as value.
_ = MLCubeConfig.ensure_values_exist(param_def, 'default', lambda: name)
_ = MLCubeConfig.ensure_values_exist(param_def, "default", lambda: name)
# One challenge is how to identify type (file, directory) of input/output parameters if users have
# not provided these types. The below is a kind of rule-based system that tries to infer types.

# Make sure every parameter definition contains 'type' field. Also, if it's unknown, we can assume it's a
# directory if a value ends with forward/backward slash.
_ = MLCubeConfig.ensure_values_exist(param_def, 'type', lambda: ParameterType.UNKNOWN)
if param_def.type == ParameterType.UNKNOWN and param_def.default.endswith(separators):
_ = MLCubeConfig.ensure_values_exist(
param_def, "type", lambda: ParameterType.UNKNOWN
)
if param_def.type == ParameterType.UNKNOWN and param_def.default.endswith(
separators
):
param_def.type = ParameterType.DIRECTORY
# See if there is value on a command line
param_def.default = task_cli_args.get(name, param_def.default)
Expand All @@ -243,7 +288,9 @@ def check_parameters(parameters: DictConfig, task_cli_args: t.Dict) -> None:
# if we can do the same with user-provided values.
# TODO: what if a parameter in mlcube.yaml is declared to be a file, but users provided something with
# slash at the end.
if param_def.type == ParameterType.UNKNOWN and param_def.default.endswith(separators):
if param_def.type == ParameterType.UNKNOWN and param_def.default.endswith(
separators
):
param_def.type = ParameterType.DIRECTORY

# TODO: For some input parameters, that generally speaking must exist, we can figure out types later,
Expand Down

0 comments on commit 1955fc9

Please sign in to comment.