diff --git a/tesk/api/ga4gh/tes/controllers.py b/tesk/api/ga4gh/tes/controllers.py index 466053d..653d3b3 100644 --- a/tesk/api/ga4gh/tes/controllers.py +++ b/tesk/api/ga4gh/tes/controllers.py @@ -3,9 +3,14 @@ import logging # from connexion import request # type: ignore +from typing import Any + from foca.utils.logging import log_traffic # type: ignore +from tesk.api.ga4gh.tes.models import TesTask from tesk.api.ga4gh.tes.service_info.service_info import ServiceInfo +from tesk.api.ga4gh.tes.task.create_task import CreateTesTask +from tesk.exceptions import InternalServerError # Get logger instance logger = logging.getLogger(__name__) @@ -26,14 +31,19 @@ def CancelTask(id, *args, **kwargs) -> dict: # type: ignore # POST /tasks @log_traffic -def CreateTask(*args, **kwargs) -> dict: # type: ignore +def CreateTask(**kwargs) -> dict: # type: ignore """Create task. Args: - *args: Variable length argument list. **kwargs: Arbitrary keyword arguments. """ - pass + try: + request_body: Any = kwargs.get("body") + tes_task = TesTask(**request_body) + response = CreateTesTask(tes_task).response() + return response + except Exception as e: + raise InternalServerError from e # GET /tasks/service-info diff --git a/tesk/api/ga4gh/tes/models.py b/tesk/api/ga4gh/tes/models.py index c442e1e..54bb6ba 100644 --- a/tesk/api/ga4gh/tes/models.py +++ b/tesk/api/ga4gh/tes/models.py @@ -275,7 +275,7 @@ class TesResources(BaseModel): example={"VmSize": "Standard_D64_v3"}, ) backend_parameters_strict: Optional[bool] = Field( - False, + default=False, description="If set to true, backends should fail the task if any " "backend_parameters\nkey/values are unsupported, otherwise, backends should " "attempt to run the task", diff --git a/tesk/api/ga4gh/tes/task/__init__.py b/tesk/api/ga4gh/tes/task/__init__.py new file mode 100644 index 0000000..b8e71d8 --- /dev/null +++ b/tesk/api/ga4gh/tes/task/__init__.py @@ -0,0 +1 @@ +"""Task API controller logic.""" diff --git a/tesk/api/ga4gh/tes/task/create_task.py b/tesk/api/ga4gh/tes/task/create_task.py new file mode 100644 index 0000000..17568d4 --- /dev/null +++ b/tesk/api/ga4gh/tes/task/create_task.py @@ -0,0 +1,75 @@ +"""TESK API module for creating a task.""" + +import logging + +from tesk.api.ga4gh.tes.models import TesCreateTaskResponse, TesResources, TesTask +from tesk.api.ga4gh.tes.task.task_request import TesTaskRequest +from tesk.exceptions import KubernetesError + +logger = logging.getLogger(__name__) + + +class CreateTesTask(TesTaskRequest): + """Create TES task.""" + + def __init__( + self, + task: TesTask, + ): + """Initialize the CreateTask class. + + Args: + task: TES task to create. + """ + super().__init__() + self.task = task + + def handle_request(self) -> TesCreateTaskResponse: + """Create TES task.""" + attempts_no = 0 + while ( + attempts_no < self.tesk_k8s_constants.job_constants.JOB_CREATE_ATTEMPTS_NO + ): + try: + attempts_no += 1 + resources = self.task.resources + minimum_ram_gb = self.kubernetes_client_wrapper.minimum_ram_gb() + + if not self.task.resources: + self.task.resources = TesResources(cpu_cores=int(minimum_ram_gb)) + if resources and resources.ram_gb and resources.ram_gb < minimum_ram_gb: + self.task.resources.ram_gb = minimum_ram_gb + + taskmaster_job = self.tes_kubernetes_converter.from_tes_task_to_k8s_job( + self.task, + ) + taskmaster_config_map = ( + self.tes_kubernetes_converter.from_tes_task_to_k8s_config_map( + self.task, + taskmaster_job, + ) + ) + + _ = self.kubernetes_client_wrapper.create_config_map( + taskmaster_config_map + ) + created_job = self.kubernetes_client_wrapper.create_job(taskmaster_job) + + assert created_job.metadata is not None + assert created_job.metadata.name is not None + + return TesCreateTaskResponse(id=created_job.metadata.name) + + except KubernetesError as e: + if ( + not e.is_object_name_duplicated() + or attempts_no + >= self.tesk_k8s_constants.job_constants.JOB_CREATE_ATTEMPTS_NO + ): + raise e + + except Exception as exc: + logging.error("ERROR: In createTask", exc_info=True) + raise exc + + return TesCreateTaskResponse(id="") # To silence mypy, should never be reached diff --git a/tesk/api/ga4gh/tes/task/task_request.py b/tesk/api/ga4gh/tes/task/task_request.py new file mode 100644 index 0000000..7417a03 --- /dev/null +++ b/tesk/api/ga4gh/tes/task/task_request.py @@ -0,0 +1,38 @@ +"""Base class for tesk request.""" + +import json +import logging +from abc import ABC, abstractmethod + +from pydantic import BaseModel + +from tesk.k8s.constants import tesk_k8s_constants +from tesk.k8s.converter.converter import TesKubernetesConverter +from tesk.k8s.wrapper import KubernetesClientWrapper + +logger = logging.getLogger(__name__) + + +class TesTaskRequest(ABC): + """Base class for tesk request ecapsulating common methods and members.""" + + def __init__(self): + """Initialise base class for tesk request.""" + self.kubernetes_client_wrapper = KubernetesClientWrapper() + self.tes_kubernetes_converter = TesKubernetesConverter() + self.tesk_k8s_constants = tesk_k8s_constants + + @abstractmethod + def handle_request(self) -> BaseModel: + """Business logic for the request.""" + pass + + def response(self) -> dict: + """Get response for the request.""" + response: BaseModel = self.handle_request() + try: + res: dict = json.loads(json.dumps(response)) + return res + except (TypeError, ValueError) as e: + logger.info(e) + return response.dict() diff --git a/tesk/constants.py b/tesk/constants.py index ff856bc..a301504 100644 --- a/tesk/constants.py +++ b/tesk/constants.py @@ -27,8 +27,8 @@ class TeskConstants(BaseModel): TASKMASTER_IMAGE_NAME: str = "docker.io/elixircloud/tesk-core-taskmaster" TASKMASTER_IMAGE_VERSION: str = "latest" TASKMASTER_SERVICE_ACCOUNT_NAME: str = "taskmaster" - FILER_BACKOFF_LIMIT: int = 2 - EXECUTOR_BACKOFF_LIMIT: int = 2 + FILER_BACKOFF_LIMIT: str = "2" + EXECUTOR_BACKOFF_LIMIT: str = "2" class Config: """Configuration for class.""" diff --git a/tesk/custom_config.py b/tesk/custom_config.py index 5adaccf..4b584b1 100644 --- a/tesk/custom_config.py +++ b/tesk/custom_config.py @@ -60,8 +60,8 @@ class Taskmaster(BaseModel): environment: Optional[Dict[str, str]] = None serviceAccountName: str = tesk_constants.TASKMASTER_SERVICE_ACCOUNT_NAME executorSecret: Optional[ExecutorSecret] = None - filerBackoffLimit: int = tesk_constants.FILER_BACKOFF_LIMIT - executorBackoffLimit: int = tesk_constants.EXECUTOR_BACKOFF_LIMIT + filerBackoffLimit: str = tesk_constants.FILER_BACKOFF_LIMIT + executorBackoffLimit: str = tesk_constants.EXECUTOR_BACKOFF_LIMIT class CustomConfig(BaseModel): diff --git a/tesk/exceptions.py b/tesk/exceptions.py index d270678..35471f1 100644 --- a/tesk/exceptions.py +++ b/tesk/exceptions.py @@ -1,5 +1,7 @@ """App exceptions.""" +from http import HTTPStatus + from connexion.exceptions import ( BadRequestProblem, ExtraParameterProblem, @@ -26,6 +28,10 @@ class ConfigInvalidError(ValueError): class KubernetesError(ApiException): """Kubernetes error.""" + def is_object_name_duplicated(self) -> bool: + """Check if object name is duplicated.""" + return self.status == HTTPStatus.CONFLICT + # exceptions raised in app context exceptions = { diff --git a/tesk/k8s/converter/__init__.py b/tesk/k8s/converter/__init__.py new file mode 100644 index 0000000..33ac4cc --- /dev/null +++ b/tesk/k8s/converter/__init__.py @@ -0,0 +1 @@ +"""Module for converting Kubernetes objects to Task objects.""" diff --git a/tesk/k8s/converter/converter.py b/tesk/k8s/converter/converter.py new file mode 100644 index 0000000..d3ca901 --- /dev/null +++ b/tesk/k8s/converter/converter.py @@ -0,0 +1,312 @@ +"""Module for converting TES tasks to Kubernetes jobs.""" + +import base64 +import gzip +import json +import logging +from decimal import Decimal +from enum import Enum +from io import BytesIO +from typing import Any, Optional + +from kubernetes.client import ( + V1ConfigMap, + V1ConfigMapVolumeSource, + V1Container, + V1EnvVar, + V1JobSpec, + V1ObjectMeta, + V1PodSpec, + V1PodTemplateSpec, + V1ResourceRequirements, + V1Volume, +) +from kubernetes.client.models import V1Job +from kubernetes.utils.quantity import parse_quantity # type: ignore + +from tesk.api.ga4gh.tes.models import ( + TesExecutor, + TesResources, + TesTask, +) +from tesk.custom_config import Taskmaster +from tesk.k8s.constants import tesk_k8s_constants +from tesk.k8s.converter.data.job import Job +from tesk.k8s.converter.data.task import Task +from tesk.k8s.converter.executor_command_wrapper import ExecutorCommandWrapper +from tesk.k8s.converter.template import KubernetesTemplateSupplier +from tesk.k8s.wrapper import KubernetesClientWrapper +from tesk.utils import ( + get_taskmaster_env_property, + pydantic_model_list_dict, +) + +logger = logging.getLogger(__name__) + + +class TesKubernetesConverter: + """Convert TES requests to Kubernetes resources.""" + + def __init__(self): + """Initialize the converter.""" + self.taskmaster_env_properties: Taskmaster = get_taskmaster_env_property() + self.template_supplier = KubernetesTemplateSupplier() + self.tesk_k8s_constants = tesk_k8s_constants + self.kubernetes_client_wrapper = KubernetesClientWrapper() + + def from_tes_task_to_k8s_job(self, task: TesTask): + """Convert TES task to Kubernetes job.""" + taskmaster_job: V1Job = ( + self.template_supplier.get_taskmaster_template_with_value_from_config() + ) + + if taskmaster_job.metadata is None: + taskmaster_job.metadata = V1ObjectMeta() + + if taskmaster_job.metadata.annotations is None: + taskmaster_job.metadata.annotations = {} + + if taskmaster_job.metadata.labels is None: + taskmaster_job.metadata.labels = {} + + if task.name: + taskmaster_job.metadata.annotations[ + self.tesk_k8s_constants.annotation_constants.ANN_TESTASK_NAME_KEY + ] = task.name + # taskmaster_job.metadata.labels[self.constants.label_userid_key] = user[ + # "username" + # ] + + # if task.tags and "GROUP_NAME" in task.tags: + # taskmaster_job.metadata.labels[self.constants.label_userid_key] = task[ + # "tags" + # ]["GROUP_NAME"] + # elif user["is_member"]: + # taskmaster_job.metadata.labels[self.constants.label_groupname_key] = user[ + # "any_group" + # ] + + json_input = json.dumps( + task.dict(), + indent=2, + default=lambda enum: str(enum.name) if isinstance(enum, Enum) else None, + ) + + try: + taskmaster_job.metadata.annotations[ + self.tesk_k8s_constants.annotation_constants.ANN_JSON_INPUT_KEY + ] = json_input + except Exception as ex: + logger.info( + f"Serializing task {taskmaster_job.metadata.name} to JSON failed", ex + ) + + volume = V1Volume( + name="jsoninput", + config_map=V1ConfigMapVolumeSource(name=taskmaster_job.metadata.name), + ) + + if taskmaster_job.spec is None: + taskmaster_job.spec = V1JobSpec(template=V1PodTemplateSpec()) + if taskmaster_job.spec.template.spec is None: + taskmaster_job.spec.template.spec = V1PodSpec(containers=[]) + if taskmaster_job.spec.template.spec.volumes is None: + taskmaster_job.spec.template.spec.volumes = [] + + taskmaster_job.spec.template.spec.volumes.append(volume) + return taskmaster_job + + def from_tes_task_to_k8s_config_map( + self, + task: TesTask, + taskmaster_job: V1Job, + ) -> V1ConfigMap: + """Create a Kubernetes ConfigMap from a TES task.""" + assert taskmaster_job.metadata is not None, ( + "Taskmaster job metadata should have already been set while create" + " taskmaster!" + ) + + taskmaster_config_map = V1ConfigMap( + metadata=V1ObjectMeta(name=taskmaster_job.metadata.name) + ) + + assert ( + taskmaster_config_map.metadata is not None + ), "Taskmaster metadata is should have already been set!" + + if taskmaster_config_map.metadata.labels is None: + taskmaster_config_map.metadata.labels = {} + + if taskmaster_config_map.metadata.annotations is None: + taskmaster_config_map.metadata.annotations = {} + + # FIXME: What if the task name is None? + task_name = task.name or "task-name-not-set" + + taskmaster_config_map.metadata.annotations[ + self.tesk_k8s_constants.annotation_constants.ANN_TESTASK_NAME_KEY + ] = task_name + + # taskmaster_config_map.metadata.labels[self.constants.label_userid_key] + # = user["username"] + + if task.tags and "GROUP_NAME" in task.tags: + taskmaster_config_map.metadata.labels[ + self.tesk_k8s_constants.label_constants.LABEL_GROUPNAME_KEY + ] = task.tags["GROUP_NAME"] + # elif user["is_member"]: + # taskmaster_config_map.metadata.labels[self.constants.label_groupname_key] + # = user["any_group"] + + assert taskmaster_config_map.metadata.name is not None + assert task.resources is not None + + executors_as_jobs = [ + self.from_tes_executor_to_k8s_job( + generated_task_id=taskmaster_config_map.metadata.name, + tes_task_name=task_name, + executor=executor, + executor_index=idx, + resources=task.resources, + ) + for idx, executor in enumerate(task.executors) + ] + + taskmaster_input: dict[str, Any] = { + "inputs": pydantic_model_list_dict(task.inputs) if task.inputs else [], + "outputs": pydantic_model_list_dict(task.outputs) if task.outputs else [], + "volumes": task.volumes or [], + "resources": { + "disk_gb": float(task.resources.disk_gb) + if task.resources.disk_gb + else 10.0 + }, + } + taskmaster_input[ + self.tesk_k8s_constants.job_constants.TASKMASTER_INPUT_EXEC_KEY + ] = [exec_job.to_dict() for exec_job in executors_as_jobs] + + taskmaster_input_as_json = json.loads( + json.dumps( + taskmaster_input, + default=lambda obj: float(obj) + if isinstance(obj, Decimal) + else TypeError, + ) + ) + + try: + with BytesIO() as obj: + with gzip.GzipFile(fileobj=obj, mode="wb") as gzip_file: + json_data = json.dumps(taskmaster_input_as_json) + gzip_file.write(json_data.encode("utf-8")) + taskmaster_config_map.binary_data = { + f"{self.tesk_k8s_constants.job_constants.TASKMASTER_INPUT}.gz": base64.b64encode( # noqa: E501 + obj.getvalue() + ).decode("utf-8") + } + except Exception as e: + logger.info( + ( + f"Compression of task {taskmaster_config_map.metadata.name}" + f" JSON configmap failed" + ), + e, + ) + + return taskmaster_config_map + + def from_tes_executor_to_k8s_job( # noqa: PLR0913, PLR0912 + self, + generated_task_id: str, + tes_task_name: Optional[str], + executor: TesExecutor, + executor_index: int, + resources: TesResources, + ) -> V1Job: + """Create a Kubernetes job from a TES executor.""" + # Get new template executor Job object + executor_job: V1Job = ( + self.template_supplier.get_executor_template_with_value_from_config() + ) + + # Set executors name based on taskmaster's job name + Job(executor_job).change_job_name( + Task(taskmaster_name=generated_task_id).get_executor_name(executor_index) + ) + + if executor_job.metadata is None: + executor_job.metadata = V1ObjectMeta() + + # Put arbitrary labels and annotations + executor_job.metadata.labels = executor_job.metadata.labels or {} + executor_job.metadata.labels[ + self.tesk_k8s_constants.label_constants.LABEL_TESTASK_ID_KEY + ] = generated_task_id + executor_job.metadata.labels[ + self.tesk_k8s_constants.label_constants.LABEL_EXECNO_KEY + ] = str(executor_index) + # job.metadata.labels[self.constants.label_userid_key] = user.username + + if executor_job.metadata is None: + executor_job.metadata = V1ObjectMeta() + if executor_job.metadata.annotations is None: + executor_job.metadata.annotations = {} + + if tes_task_name: + executor_job.metadata.annotations[ + self.tesk_k8s_constants.annotation_constants.ANN_TESTASK_NAME_KEY + ] = tes_task_name + + if executor_job.spec is None: + executor_job.spec = V1JobSpec(template=V1PodTemplateSpec()) + if executor_job.spec.template.spec is None: + executor_job.spec.template.spec = V1PodSpec(containers=[]) + + container: V1Container = executor_job.spec.template.spec.containers[0] + + # TODO: Not sure what to do with this + # Convert potential TRS URI into docker image + container.image = executor.image + + if not container.command: + container.command = [] + + for command in ExecutorCommandWrapper( + executor + ).get_commands_with_stream_redirects(): + container.command.append(command) + + if executor.env: + container.env = [ + V1EnvVar(name=key, value=value) for key, value in executor.env.items() + ] + else: + container.env = [] + + container.working_dir = executor.workdir + container.resources = V1ResourceRequirements(requests={}) + + assert container.resources.requests is not None + + if resources.cpu_cores: + container.resources.requests["cpu"] = parse_quantity( + str(resources.cpu_cores) + ) + + if resources.ram_gb: + container.resources.requests["memory"] = parse_quantity( + f"{resources.ram_gb:.6f}Gi" + ) + + # # FIXME: Workaround + # # Check if volumes is None and set it to an empty list if it is + if ( + executor_job.spec + and executor_job.spec.template.spec + and executor_job.spec.template.spec.volumes is None + ): + executor_job.spec.template.spec.volumes = [] + + return executor_job diff --git a/tesk/k8s/converter/data/__init__.py b/tesk/k8s/converter/data/__init__.py new file mode 100644 index 0000000..080fc38 --- /dev/null +++ b/tesk/k8s/converter/data/__init__.py @@ -0,0 +1 @@ +"""Data structure module to handle k8s resources of TES.""" diff --git a/tesk/k8s/converter/data/job.py b/tesk/k8s/converter/data/job.py new file mode 100644 index 0000000..73251dd --- /dev/null +++ b/tesk/k8s/converter/data/job.py @@ -0,0 +1,80 @@ +"""A container for a single Kubernetes job object. + +Can be both a taskmaster and an executor, it list of worker pods (Kubernetes +Pod objects). +""" + +from enum import Enum +from typing import List, Optional + +from kubernetes.client import V1Job, V1ObjectMeta, V1Pod + + +class Job: + """Class to list worker pods (Kubernetes Pod objects).""" + + def __init__(self, job: V1Job): + """Initializes the Job with a Kubernetes job object.""" + self.job: V1Job = job + self.pods: List[V1Pod] = [] + + def get_job(self) -> V1Job: + """Returns the Kubernetes job object.""" + return self.job + + def add_pod(self, pod: V1Pod): + """Adds a single pod to the list.""" + self.pods.append(pod) + + def has_pods(self) -> bool: + """Checks if the job has any pods.""" + return bool(self.pods) + + def get_first_pod(self) -> Optional[V1Pod]: + """Returns arbitrarily chosen pod from the list. + + Currently the first one added or None if the job has no pods. + """ + if not self.has_pods(): + return None + return self.pods[0] + + def get_pods(self) -> List[V1Pod]: + """Returns the list of job pods. + + Returns in the order of addition to the list or an empty list if no pods. + """ + return self.pods + + def change_job_name(self, new_name: str): + """Changes the job name. + + Also the names in its metadata and container specs. + """ + if self.job.metadata is None: + self.job.metadata = V1ObjectMeta(name=new_name) + else: + self.job.metadata.name = new_name + + if ( + self.job is not None + and self.job.spec is not None + and self.job.spec.template is not None + and self.job.spec.template.metadata is not None + ): + self.job.spec.template.metadata.name = new_name + + if self.job.spec.template.spec and self.job.spec.template.spec.containers: + self.job.spec.template.spec.containers[0].name = new_name + + def get_job_name(self) -> Optional[str]: + """Returns the job name.""" + return self.job.metadata.name if self.job.metadata else None + + +class JobStatus(Enum): + """State of job.""" + + ACTIVE = "Active" + SUCCEEDED = "Succeeded" + FAILED = "Failed" diff --git a/tesk/k8s/converter/data/task.py b/tesk/k8s/converter/data/task.py new file mode 100644 index 0000000..3f0e0e2 --- /dev/null +++ b/tesk/k8s/converter/data/task.py @@ -0,0 +1,98 @@ +"""A composite that represents Kubernetes object's graph of a single TES task. + +- Taskmaster job with its pods. +- Executor jobs with its pods. +""" + +import re +from typing import Dict, List, Optional + +from kubernetes.client.models import V1Job, V1ObjectMeta + +from tesk.k8s.constants import tesk_k8s_constants +from tesk.k8s.converter.data.job import Job + + +class Task: + """Task is a composite. + + It represents Kubernetes object's graph of a single TES task. + """ + + def __init__( + self, taskmaster: Optional[Job] = None, taskmaster_name: Optional[str] = None + ): + """Initialize the Task.""" + if taskmaster: + self.taskmaster = taskmaster + elif taskmaster_name: + job = V1Job(metadata=V1ObjectMeta(name=taskmaster_name)) + self.taskmaster = Job(job) + else: + raise ValueError("Either taskmaster or taskmaster_name must be provided.") + + self.executors_by_name: Dict[str, Job] = {} + self.output_filer: Optional[Job] = None + self.tesk_k8s_constants = tesk_k8s_constants + self.MAX_INT = 2**31 - 1 + + def add_executor(self, executor: Job) -> None: + """Add executor to the task.""" + metadata = executor.get_job().metadata + assert metadata is not None + + name = metadata.name + assert name is not None + + self.executors_by_name.setdefault(name, executor) + + def set_output_filer(self, filer: Job): + """Set output filer for the task.""" + self.output_filer = filer + + def get_taskmaster(self) -> Job: + """Get taskmaster job.""" + return self.taskmaster + + def get_executors(self) -> List[Job]: + """Get executors.""" + return sorted(self.executors_by_name.values(), key=self.extract_executor_number) + + def get_last_executor(self) -> Optional[Job]: + """Get last executor.""" + if not self.executors_by_name: + return None + executors = self.get_executors() + return executors[-1] if executors else None + + def get_output_filer(self) -> Optional[Job]: + """Get output filer.""" + return self.output_filer + + def extract_executor_number(self, executor: Job) -> int: + """Extract executor number from the executor's name.""" + taskmaster_name = self.taskmaster.get_job_name() + assert taskmaster_name is not None + + prefix = ( + taskmaster_name + self.tesk_k8s_constants.job_constants.JOB_NAME_EXEC_PREFIX + ) + exec_name = executor.get_job_name() + + if not exec_name: + return self.MAX_INT + + match = re.match(f"{re.escape(prefix)}(\d+)", exec_name) + if match: + return int(match.group(1)) + + return self.MAX_INT + + def get_executor_name(self, executor_index: int) -> str: + """Get executor name based on the taskmaster's job name and executor index.""" + taskmaster_name = self.taskmaster.get_job_name() + return ( + f"{taskmaster_name}" + f"{self.tesk_k8s_constants.job_constants.JOB_NAME_EXEC_PREFIX}" + f"{str(executor_index).zfill(self.tesk_k8s_constants.job_constants.JOB_NAME_EXEC_NO_LENGTH)}" + ) diff --git a/tesk/k8s/converter/executor_command_wrapper.py b/tesk/k8s/converter/executor_command_wrapper.py new file mode 100644 index 0000000..ebda592 --- /dev/null +++ b/tesk/k8s/converter/executor_command_wrapper.py @@ -0,0 +1,54 @@ +"""Wraps list of executor's command. + +Such that: +- If any of executor's stdin/stdout/stderr params is set, the command runs in shell +- Each part of the original command (single command/argument) that contained shell + special chars is surrounded by single quotes, plus single quote inside such string + are replaced with '"'"' sequence +- `stdin`, `stdout`, `stderr` streams are redirected to paths according to executors + params +""" + +from typing import List + +from tesk.api.ga4gh.tes.models import TesExecutor + + +class ExecutorCommandWrapper: + """Wraps executor's command.""" + + def __init__(self, executor: TesExecutor): + """Initialize the wrapper.""" + self.executor = executor + + def get_commands_with_stream_redirects(self) -> List[str]: + """Get command with stream redirects.""" + result = [] + + if ( + not self.executor.stdin + and not self.executor.stdout + and not self.executor.stderr + ): + return self.executor.command + + result.append("/bin/sh") + result.append("-c") + + command_parts = [" ".join(self.executor.command)] + + if self.executor.stdin: + command_parts.append("<") + command_parts.append(self.executor.stdin) + + if self.executor.stdout: + command_parts.append(">") + command_parts.append(self.executor.stdout) + + if self.executor.stderr: + command_parts.append("2>") + command_parts.append(self.executor.stderr) + + result.append(" ".join(command_parts)) + + return result diff --git a/tesk/k8s/converter/template.py b/tesk/k8s/converter/template.py new file mode 100644 index 0000000..feb5d09 --- /dev/null +++ b/tesk/k8s/converter/template.py @@ -0,0 +1,192 @@ +"""Create template for kubernetes objects.""" + +import logging +import uuid +from typing import Iterable + +from kubernetes.client import ( + V1Container, + V1EnvVar, + V1JobSpec, + V1ObjectMeta, + V1PodSpec, + V1PodTemplateSpec, + V1ResourceRequirements, + V1SecretVolumeSource, + V1Volume, + V1VolumeMount, +) +from kubernetes.client.models import V1Job + +from tesk.constants import tesk_constants +from tesk.custom_config import Taskmaster +from tesk.k8s.constants import tesk_k8s_constants +from tesk.utils import get_taskmaster_env_property, get_taskmaster_template + +logger = logging.getLogger(__name__) + + +class KubernetesTemplateSupplier: + """Templates for tasmaster's and executor's job object..""" + + def __init__( + self, + ): + """Initialize the converter.""" + self.taskmaster_template: V1Job = get_taskmaster_template() + self.taskmaster: Taskmaster = get_taskmaster_env_property() + self.tesk_k8s_constants = tesk_k8s_constants + self.tesk_constants = tesk_constants + + def get_taskmaster_name(self) -> str: + """Generate a unique name for the taskmaster job.""" + name: str = self.tesk_k8s_constants.job_constants.JOB_NAME_TASKM_PREFIX + str( + uuid.uuid4() + ) + return name + + def get_taskmaster_template_with_value_from_config(self) -> V1Job: + """Create a template for the taskmaster job.""" + job: V1Job = self.taskmaster_template + + if job.spec is None: + job.spec = V1JobSpec(template=V1PodTemplateSpec()) + if job.spec.template.spec is None: + job.spec.template.spec = V1PodSpec(containers=[]) + + job.spec.template.spec.service_account_name = self.taskmaster.serviceAccountName + + container = job.spec.template.spec.containers[0] + container.image = ( + f"{self.taskmaster.imageName}:" f"{self.taskmaster.imageVersion}" + ) + + assert isinstance(container.args, Iterable) + + container.args.extend( + [ + "-n", + self.tesk_constants.TESK_NAMESPACE, + "-fn", + self.taskmaster.filerImageName, + "-fv", + self.taskmaster.filerImageVersion, + ] + ) + + if self.taskmaster.debug: + container.args.append("-d") + container.image_pull_policy = "Always" + + if job.metadata is None: + job.metadata = V1ObjectMeta(labels={}) + + if job.metadata.labels is None: + job.metadata.labels = {} + + job.metadata.labels[ + self.tesk_k8s_constants.label_constants.LABEL_JOBTYPE_KEY + ] = self.tesk_k8s_constants.label_constants.LABEL_JOBTYPE_VALUE_TASKM + + # Assign a unique name to the taskmaster job + taskmaster_name = self.get_taskmaster_name() + job.metadata.name = taskmaster_name + container.name = taskmaster_name + + assert isinstance(container.env, Iterable) + + if container.env is None: + container.env = V1EnvVar() + + if self.taskmaster and self.taskmaster.environment: + container.env.extend( + [ + V1EnvVar(name=key.upper().replace(".", "_"), value=value) + for key, value in self.taskmaster.environment.items() + ] + ) + + # Set backoff env variables for `filer` and `executor` + backoff_limits = { + self.tesk_k8s_constants.job_constants.FILER_BACKOFF_LIMIT: self.tesk_constants.FILER_BACKOFF_LIMIT, # noqa: E501 + self.tesk_k8s_constants.job_constants.EXECUTOR_BACKOFF_LIMIT: self.tesk_constants.EXECUTOR_BACKOFF_LIMIT, # noqa: E501 + } + + container.env.extend( + [V1EnvVar(name=key, value=value) for key, value in backoff_limits.items()] + ) + + ftp_secrets = [ + self.tesk_k8s_constants.ftp_constants.FTP_SECRET_USERNAME_ENV, + self.tesk_k8s_constants.ftp_constants.FTP_SECRET_PASSWORD_ENV, + ] + container.env = [ + env + for env in container.env + if env.name not in ftp_secrets or self.taskmaster.ftp.enabled + ] + + if self.taskmaster.ftp.enabled: + for env in container.env: + if env.name in ftp_secrets: + assert env.value_from is not None + assert env.value_from.secret_key_ref is not None + env.value_from.secret_key_ref.name = self.taskmaster.ftp.secretName + + return job + + def get_executor_template_with_value_from_config(self) -> V1Job: + """Create a template for the executor job.""" + container = V1Container( + name=self.tesk_k8s_constants.label_constants.LABEL_JOBTYPE_VALUE_EXEC, + resources=V1ResourceRequirements(), + ) + + if self.taskmaster.executorSecret is not None: + container.volume_mounts = [ + V1VolumeMount( + read_only=True, + name=str(self.taskmaster.executorSecret.name), + mount_path=str(self.taskmaster.executorSecret.mountPath), + ) + ] + + pod_spec = V1PodSpec( + containers=[container], + restart_policy=self.tesk_k8s_constants.k8s_constants.JOB_RESTART_POLICY, + ) + + job = V1Job( + api_version=self.tesk_k8s_constants.k8s_constants.K8S_BATCH_API_VERSION, + kind=self.tesk_k8s_constants.k8s_constants.K8S_BATCH_API_JOB_TYPE, + metadata=V1ObjectMeta( + labels={ + self.tesk_k8s_constants.label_constants.LABEL_JOBTYPE_KEY: ( + self.tesk_k8s_constants.label_constants.LABEL_JOBTYPE_VALUE_EXEC + ) + } + ), + spec=V1JobSpec( + template=V1PodTemplateSpec(metadata=V1ObjectMeta(), spec=pod_spec) + ), + ) + + if self.taskmaster.executorSecret is not None: + if job.spec is None: + job.spec = V1JobSpec(template=V1PodTemplateSpec()) + if job.spec.template.spec is None: + job.spec.template.spec = V1PodSpec(containers=[]) + + job.spec.template.spec.volumes = [ + V1Volume( + name=str(self.taskmaster.executorSecret.name), + secret=V1SecretVolumeSource( + secret_name=self.taskmaster.executorSecret.name + ), + ) + ] + + assert job.spec is not None + assert job.spec.template.spec is not None + + return job diff --git a/tesk/utils.py b/tesk/utils.py index 960549f..36eb043 100644 --- a/tesk/utils.py +++ b/tesk/utils.py @@ -1,7 +1,9 @@ """Utility functions for the TESK package.""" +import json import os from pathlib import Path +from typing import List, Sequence from foca import Foca from kubernetes.client.models import ( @@ -20,12 +22,13 @@ V1Volume, V1VolumeMount, ) +from pydantic import BaseModel from tesk.custom_config import ( CustomConfig, Taskmaster, ) -from tesk.exceptions import ConfigInvalidError +from tesk.exceptions import ConfigInvalidError, ConfigNotFoundError from tesk.k8s.constants import tesk_k8s_constants @@ -162,3 +165,28 @@ def get_taskmaster_template() -> V1Job: ) ), ) + + +def get_taskmaster_env_property() -> Taskmaster: + """Get the taskmaster env property from the custom configuration. + + Returns: + The taskmaster env property. + """ + custom_conf = get_custom_config() + try: + return custom_conf.taskmaster + except AttributeError: + raise ConfigNotFoundError( + "Custom configuration doesn't seem to have taskmaster_env_properties in " + "config file." + f"Custom config:\n{custom_conf}" + ) from None + + +def pydantic_model_list_dict(model_list: Sequence[BaseModel]) -> List[str]: + """Convert a list of pydantic models to a list of dictionaries.""" + json_list = [] + for item in model_list: + json_list.append(json.loads(item.json())) + return json_list