Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: create task #207

Open
wants to merge 5 commits into
base: template
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions tesk/api/ga4gh/tes/controllers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@
import logging

# from connexion import request # type: ignore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove?

from typing import Any

from foca.utils.logging import log_traffic # type: ignore

from tesk.api.ga4gh.tes.models import TesTask
from tesk.api.ga4gh.tes.service_info.service_info import ServiceInfo
from tesk.api.ga4gh.tes.task.create_task import CreateTesTask
from tesk.exceptions import InternalServerError

# Get logger instance
logger = logging.getLogger(__name__)
Expand All @@ -26,14 +31,19 @@ def CancelTask(id, *args, **kwargs) -> dict: # type: ignore

# POST /tasks
@log_traffic
def CreateTask(*args, **kwargs) -> dict: # type: ignore
def CreateTask(**kwargs) -> dict: # type: ignore
"""Create task.

Args:
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
"""
pass
try:
request_body: Any = kwargs.get("body")
tes_task = TesTask(**request_body)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we catch a ValidationError hear and reraise as BadRequest? I mean, in principle Connexion takes care of 400-like errors, but I'm not sure how good it is. And since we defined these models anyhow, we might as well make use of them. As it is, an invalid request would lead to a 500 error (if it ever gets this far)...

response = CreateTesTask(tes_task).response()
return response
Comment on lines +43 to +44
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might as well return it - no point of creating a variable only to return it immediately.

except Exception as e:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Implement more granular error handling

While catching and returning an internal server error is better than uncaught exceptions, consider implementing more granular error handling. This could provide better feedback to API consumers about what went wrong.

Suggested change
except Exception as e:
except ValueError as e:
raise BadRequest(str(e)) from e
except ValidationError as e:
raise UnprocessableEntity(str(e)) from e
except ConnectionError as e:
raise ServiceUnavailable(str(e)) from e
except Exception as e:
raise InternalServerError from e

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a very good point - with the current code we couldn't get any other error than InternalServerError. But we surely want to handle 400 (okay, may be caught be Connexion), 401 and 403 differently - at the very least.

In any case, FOCA takes care of this gracefully, so I think we don't actually need more granual error handling, but no additional error handling at all! What FOCA does is to handle the errors according to tesk.exceptions.exceptions. Every one of the listed exceptions will be transformed into an appropriate JSON response, as required by the OpenAPI specs. If an exception is not specifically listed, the catch-all exception will be used and transformed into a 500 response. And this is exactly what we want here: If, say, the request is unauthorized, and we raise that error somewhere in CreateTesTask, it will bubble to the top and will be returned to the caller as such. Same goes for any other known error (including any predictable InternalServerErrors). Everything that comes unexpected will be caught by the catch-all Exception that gives a specific error message that lets the caller (and the admin) know that something really unexpected happened.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the detailed explanation, @uniqueg! It's great to hear that FOCA is handling error responses according to the tesk.exceptions.exceptions module, which ensures that the API adheres to the OpenAPI specifications. This approach indeed simplifies the error handling in the CreateTask function by allowing exceptions to bubble up and be managed at a higher level.

Given this setup, it makes sense to rely on FOCA's built-in mechanisms for transforming exceptions into appropriate JSON responses. This not only reduces redundancy in error handling but also ensures consistency across the application.

If FOCA is already providing the necessary granularity and coverage for error responses, then maintaining the current structure with a catch-all Exception for truly unexpected errors is a sound approach. Just ensure that any specific exceptions you expect to handle are properly defined in tesk.exceptions.exceptions so that they are correctly transformed by FOCA.

If you have any further questions or need additional clarification, feel free to ask!

raise InternalServerError from e


# GET /tasks/service-info
Expand Down
2 changes: 1 addition & 1 deletion tesk/api/ga4gh/tes/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ class TesResources(BaseModel):
example={"VmSize": "Standard_D64_v3"},
)
backend_parameters_strict: Optional[bool] = Field(
False,
default=False,
description="If set to true, backends should fail the task if any "
"backend_parameters\nkey/values are unsupported, otherwise, backends should "
"attempt to run the task",
Expand Down
1 change: 1 addition & 0 deletions tesk/api/ga4gh/tes/task/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Task API controller logic."""
75 changes: 75 additions & 0 deletions tesk/api/ga4gh/tes/task/create_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""TESK API module for creating a task."""

import logging

from tesk.api.ga4gh.tes.models import TesCreateTaskResponse, TesResources, TesTask
from tesk.api.ga4gh.tes.task.task_request import TesTaskRequest
from tesk.exceptions import KubernetesError

logger = logging.getLogger(__name__)


class CreateTesTask(TesTaskRequest):
"""Create TES task."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Attributes? See below.


def __init__(
self,
task: TesTask,
):
"""Initialize the CreateTask class.

Args:
task: TES task to create.
"""
super().__init__()
self.task = task

def handle_request(self) -> TesCreateTaskResponse:
"""Create TES task."""
attempts_no = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add int type hint

while (
attempts_no < self.tesk_k8s_constants.job_constants.JOB_CREATE_ATTEMPTS_NO
):
try:
attempts_no += 1
jemaltahir marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider logging (in debug) that you are attempting to create a Kubernetes task, together with the attempt number and the max attempt number. Something like:

"Creating Kubernetes job (attempt 3/8)"

resources = self.task.resources
minimum_ram_gb = self.kubernetes_client_wrapper.minimum_ram_gb()

if not self.task.resources:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Careful with implicit checks against falsey values. Are you sure you are okay with any falsey value for self.task.resources (False, 0, [], None)? If so, fine. If not, it's better to be explicit about what you are checking against.

self.task.resources = TesResources(cpu_cores=int(minimum_ram_gb))
JaeAeich marked this conversation as resolved.
Show resolved Hide resolved
if resources and resources.ram_gb and resources.ram_gb < minimum_ram_gb:
self.task.resources.ram_gb = minimum_ram_gb
Comment on lines +35 to +41
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this confusing to read. The focus seems to be jumping back and forth and I don't know why you defined resources if it's anyway referencing the same object as self.task.resources (if I'm not mistaken).

How about sth like this:

minimum_ram_gb = self.kubernetes_client_wrapper.minimum_ram_gb()

if self.task.resources is None:
    self.task.resources = TesResources(cpu_cores=1, ram_gb=minimum_ram_gb)

elif (
    self.task.resources.ram_gb is None or
    self.task.resources.ram_gb < minimum_ram_gb
):
    self.task.resources.ram_gb = minimum_ram_gb

You could also consider adding a method for setting resources (or a higher level method sanitize_request() that then calls other methods for setting resources and maybe others in the future).


taskmaster_job = self.tes_kubernetes_converter.from_tes_task_to_k8s_job(
self.task,
)
taskmaster_config_map = (
self.tes_kubernetes_converter.from_tes_task_to_k8s_config_map(
self.task,
taskmaster_job,
)
)

_ = self.kubernetes_client_wrapper.create_config_map(
taskmaster_config_map
)
created_job = self.kubernetes_client_wrapper.create_job(taskmaster_job)

assert created_job.metadata is not None
assert created_job.metadata.name is not None
Comment on lines +58 to +59
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider raising some sort of KubernetesError instead.


return TesCreateTaskResponse(id=created_job.metadata.name)

except KubernetesError as e:
if (
not e.is_object_name_duplicated()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this much harder to read/parse than:

if (
    e.status != HTTPStatus.CONFLICT
    or ...
)

Why the function and the use of not?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if your client wrapper already checked for CONFLICT/409 type errors and returned these as KubernetesConflict exceptions, you could design this more elegantly by letting the while loop conditional take care of managing the retries and then handle the "too many retries" situation below the loop with an explicit InternalServerError message. Then you could simplify this check to:

except KubernetesConflict:
    pass

All the other errors you can just let bubble up. I'm pretty sure FOCA will handle these gracefully (as mentioned below).

I think this would be considerably cleaner - and more informative.

or attempts_no
>= self.tesk_k8s_constants.job_constants.JOB_CREATE_ATTEMPTS_NO
):
raise e

except Exception as exc:
logging.error("ERROR: In createTask", exc_info=True)
raise exc
Comment on lines +71 to +73
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this handled automatically by FOCA?


return TesCreateTaskResponse(id="") # To silence mypy, should never be reached
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this wouldn't be necessary if you defined the loop with while True:?

38 changes: 38 additions & 0 deletions tesk/api/ga4gh/tes/task/task_request.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a base class for a TESK request (any request, including GET requests) or a task request (a POST requests to tasks/ in order to request a task being created). If the former, make sure to rename the module and class, if the latter, make sure to adapt the module-level docstring.

Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Base class for tesk request."""

import json
import logging
from abc import ABC, abstractmethod

from pydantic import BaseModel

from tesk.k8s.constants import tesk_k8s_constants
from tesk.k8s.converter.converter import TesKubernetesConverter
from tesk.k8s.wrapper import KubernetesClientWrapper

logger = logging.getLogger(__name__)


class TesTaskRequest(ABC):
"""Base class for tesk request ecapsulating common methods and members."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please define "Args" and "Attributes" sections, either both in the class docstring or in the constructor. Or, even better, the "Args" in the constructor and the "Attributes" in the class.

Same goes elsewhere, i.e., when you deal with classes, make sure that both of "Args" and "Attributes" sections are present (even if they are identical).


def __init__(self):
"""Initialise base class for tesk request."""
self.kubernetes_client_wrapper = KubernetesClientWrapper()
self.tes_kubernetes_converter = TesKubernetesConverter()
self.tesk_k8s_constants = tesk_k8s_constants

@abstractmethod
def handle_request(self) -> BaseModel:
"""Business logic for the request."""
pass

def response(self) -> dict:
"""Get response for the request."""
response: BaseModel = self.handle_request()
try:
res: dict = json.loads(json.dumps(response))
return res
except (TypeError, ValueError) as e:
logger.info(e)
Comment on lines +33 to +37
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the significance of this? Why not going straight for the Pydantic way of marshalling the model?

return response.dict()
4 changes: 2 additions & 2 deletions tesk/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ class TeskConstants(BaseModel):
TASKMASTER_IMAGE_NAME: str = "docker.io/elixircloud/tesk-core-taskmaster"
TASKMASTER_IMAGE_VERSION: str = "latest"
TASKMASTER_SERVICE_ACCOUNT_NAME: str = "taskmaster"
FILER_BACKOFF_LIMIT: int = 2
EXECUTOR_BACKOFF_LIMIT: int = 2
FILER_BACKOFF_LIMIT: str = "2"
EXECUTOR_BACKOFF_LIMIT: str = "2"

class Config:
"""Configuration for class."""
Expand Down
4 changes: 2 additions & 2 deletions tesk/custom_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ class Taskmaster(BaseModel):
environment: Optional[Dict[str, str]] = None
serviceAccountName: str = tesk_constants.TASKMASTER_SERVICE_ACCOUNT_NAME
executorSecret: Optional[ExecutorSecret] = None
filerBackoffLimit: int = tesk_constants.FILER_BACKOFF_LIMIT
executorBackoffLimit: int = tesk_constants.EXECUTOR_BACKOFF_LIMIT
filerBackoffLimit: str = tesk_constants.FILER_BACKOFF_LIMIT
executorBackoffLimit: str = tesk_constants.EXECUTOR_BACKOFF_LIMIT


class CustomConfig(BaseModel):
Expand Down
6 changes: 6 additions & 0 deletions tesk/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""App exceptions."""

from http import HTTPStatus

from connexion.exceptions import (
BadRequestProblem,
ExtraParameterProblem,
Expand All @@ -26,6 +28,10 @@ class ConfigInvalidError(ValueError):
class KubernetesError(ApiException):
"""Kubernetes error."""
Comment on lines 28 to 29
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding KubernetesError to the exceptions dictionary below, with a 500 status (or whatever is appropriate). That way you can give it a specific error detail and title that would differentiate it from just an "Unknown error".


def is_object_name_duplicated(self) -> bool:
"""Check if object name is duplicated."""
return self.status == HTTPStatus.CONFLICT


# exceptions raised in app context
exceptions = {
Expand Down
1 change: 1 addition & 0 deletions tesk/k8s/converter/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Module for converting Kubernetes objects to Task objects."""
Loading