-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: create task #207
base: template
Are you sure you want to change the base?
feat: create task #207
Conversation
Reviewer's Guide by SourceryThis pull request implements the creation of a task in the TESK (TES on Kubernetes) system. It introduces new modules for converting TES tasks to Kubernetes jobs, handling task creation requests, and managing the execution of tasks. The changes primarily focus on the backend implementation of task creation, including the conversion of TES task specifications to Kubernetes job configurations. File-Level Changes
Tips
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @JaeAeich - I've reviewed your changes - here's some feedback:
Overall Comments:
- Consider adding more unit tests for the new converter and task creation logic to ensure robustness and catch potential edge cases.
- The error handling in CreateTesTask is good, but consider adding more detailed logging throughout the conversion process to aid in debugging and monitoring.
Here's what I looked at during the review
- 🟡 General issues: 4 issues found
- 🟡 Security: 1 issue found
- 🟢 Testing: all looks good
- 🟡 Complexity: 1 issue found
- 🟢 Documentation: all looks good
Help me be more useful! Please click 👍 or 👎 on each comment to tell me if it was helpful.
except AttributeError: | ||
raise ConfigNotFoundError( | ||
"Custom configuration doesn't seem to have taskmaster_env_properties in " | ||
"config file." | ||
f"Custom config:\n{custom_conf}" | ||
) from None | ||
|
||
|
||
def pydantic_model_list_dict(model_list: Sequence[BaseModel]) -> List[str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (performance): Consider optimizing the pydantic_model_list_dict function
The current implementation converts to JSON and back, which may be inefficient. Consider using Pydantic's built-in dict()
method instead, like this: return [item.dict() for item in model_list]
. This would be more direct and likely more performant.
def pydantic_model_list_dict(model_list: Sequence[BaseModel]) -> List[Dict]:
return [item.dict() for item in model_list]
tes_task = TesTask(**request_body) | ||
response = CreateTesTask(tes_task).response() | ||
return response | ||
except Exception as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Implement more granular error handling
While catching and returning an internal server error is better than uncaught exceptions, consider implementing more granular error handling. This could provide better feedback to API consumers about what went wrong.
except Exception as e: | |
except ValueError as e: | |
raise BadRequest(str(e)) from e | |
except ValidationError as e: | |
raise UnprocessableEntity(str(e)) from e | |
except ConnectionError as e: | |
raise ServiceUnavailable(str(e)) from e | |
except Exception as e: | |
raise InternalServerError from e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a very good point - with the current code we couldn't get any other error than InternalServerError
. But we surely want to handle 400
(okay, may be caught be Connexion), 401
and 403
differently - at the very least.
In any case, FOCA takes care of this gracefully, so I think we don't actually need more granual error handling, but no additional error handling at all! What FOCA does is to handle the errors according to tesk.exceptions.exceptions
. Every one of the listed exceptions will be transformed into an appropriate JSON response, as required by the OpenAPI specs. If an exception is not specifically listed, the catch-all exception will be used and transformed into a 500
response. And this is exactly what we want here: If, say, the request is unauthorized, and we raise that error somewhere in CreateTesTask
, it will bubble to the top and will be returned to the caller as such. Same goes for any other known error (including any predictable InternalServerError
s). Everything that comes unexpected will be caught by the catch-all Exception
that gives a specific error message that lets the caller (and the admin) know that something really unexpected happened.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the detailed explanation, @uniqueg! It's great to hear that FOCA is handling error responses according to the tesk.exceptions.exceptions
module, which ensures that the API adheres to the OpenAPI specifications. This approach indeed simplifies the error handling in the CreateTask
function by allowing exceptions to bubble up and be managed at a higher level.
Given this setup, it makes sense to rely on FOCA's built-in mechanisms for transforming exceptions into appropriate JSON responses. This not only reduces redundancy in error handling but also ensures consistency across the application.
If FOCA is already providing the necessary granularity and coverage for error responses, then maintaining the current structure with a catch-all Exception
for truly unexpected errors is a sound approach. Just ensure that any specific exceptions you expect to handle are properly defined in tesk.exceptions.exceptions
so that they are correctly transformed by FOCA.
If you have any further questions or need additional clarification, feel free to ask!
from tesk.api.ga4gh.tes.models import TesExecutor | ||
|
||
|
||
class ExecutorCommandWrapper: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚨 issue (security): Review ExecutorCommandWrapper for security implications
The logic for handling shell commands and stream redirection is intricate. Ensure that this class is robust against potential command injection vulnerabilities. Consider adding additional sanitization or using a library designed for safe command execution.
return self.output_filer | ||
|
||
def extract_executor_number(self, executor: Job) -> int: | ||
"""Extract executor number from the executor's name.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): We've found these issues:
- Use named expression to simplify assignment and conditional [×2] (
use-named-expression
) - Lift code into else after jump in control flow [×2] (
reintroduce-else
) - Swap if/else branches (
swap-if-else-branches
) - Replace if statement with if expression (
assign-if-exp
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you suggest @sourcery-ai?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Certainly! Here are some suggestions to address the code quality issues identified:
-
Use Named Expression: You can simplify assignments and conditionals by using the "walrus operator" (
:=
). This allows you to assign a value to a variable as part of an expression. For example, if you have a conditional check that assigns a value to a variable, you can combine them into a single line. -
Lift Code into Else After Jump in Control Flow: If you have a return or raise statement in an if block, you can often simplify the code by moving the subsequent code into an else block. This can make the control flow clearer.
-
Swap If/Else Branches: Sometimes, swapping the branches of an if/else statement can make the code more readable, especially if it aligns better with the logical flow of the program.
-
Replace If Statement with If Expression: If you have a simple if/else statement that assigns a value to a variable, you can often replace it with a single line using a conditional expression (also known as a ternary operator). This can make the code more concise.
If you have specific lines of code you'd like help with, feel free to share them, and I can provide more detailed suggestions!
self.executor = executor | ||
|
||
def get_commands_with_stream_redirects(self) -> List[str]: | ||
"""Get command with stream redirects.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): We've found these issues:
- Merge consecutive list appends into a single extend [×4] (
merge-list-appends-into-extend
) - Move assignment closer to its usage within a block [×2] (
move-assign-in-block
) - Merge append into list declaration (
merge-list-append
) - Merge extend into list declaration (
merge-list-extend
) - Inline variable that is immediately returned (
inline-immediately-returned-variable
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar comments from PR 206, can be revisited here as well. All in all great work :)
self.executors_by_name: Dict[str, Job] = {} | ||
self.output_filer: Optional[Job] = None | ||
self.tesk_k8s_constants = tesk_k8s_constants | ||
self.MAX_INT = 2**31 - 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a comment that explain what it is MAX_INT
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check this, since python doesn't have a range of int, hence to keep the behaviour similar to java impl, I added the range of int in python.
|
||
if self.taskmaster.debug: | ||
container.args.append("-d") | ||
container.image_pull_policy = "Always" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pull policy could be used as dynamic , https://kubernetes.io/docs/concepts/containers/images/#image-pull-policy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but this is supposed to mean that if debug is on for taskmaster, keep pulling images so that dev can keep trying what fails and what not, hence hardcode "Always".
Can you please elaborate if you meant something else, I am a lil confused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't yet managed to review the converter
subpackage. Let's see that we get #206 merged and all of the other comments addressed first as things are becoming too complex to review effectively.
@@ -3,9 +3,14 @@ | |||
import logging | |||
|
|||
# from connexion import request # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove?
tes_task = TesTask(**request_body) | ||
response = CreateTesTask(tes_task).response() | ||
return response | ||
except Exception as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a very good point - with the current code we couldn't get any other error than InternalServerError
. But we surely want to handle 400
(okay, may be caught be Connexion), 401
and 403
differently - at the very least.
In any case, FOCA takes care of this gracefully, so I think we don't actually need more granual error handling, but no additional error handling at all! What FOCA does is to handle the errors according to tesk.exceptions.exceptions
. Every one of the listed exceptions will be transformed into an appropriate JSON response, as required by the OpenAPI specs. If an exception is not specifically listed, the catch-all exception will be used and transformed into a 500
response. And this is exactly what we want here: If, say, the request is unauthorized, and we raise that error somewhere in CreateTesTask
, it will bubble to the top and will be returned to the caller as such. Same goes for any other known error (including any predictable InternalServerError
s). Everything that comes unexpected will be caught by the catch-all Exception
that gives a specific error message that lets the caller (and the admin) know that something really unexpected happened.
response = CreateTesTask(tes_task).response() | ||
return response |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might as well return it - no point of creating a variable only to return it immediately.
pass | ||
try: | ||
request_body: Any = kwargs.get("body") | ||
tes_task = TesTask(**request_body) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we catch a ValidationError
hear and reraise as BadRequest
? I mean, in principle Connexion takes care of 400
-like errors, but I'm not sure how good it is. And since we defined these models anyhow, we might as well make use of them. As it is, an invalid request would lead to a 500
error (if it ever gets this far)...
class KubernetesError(ApiException): | ||
"""Kubernetes error.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding KubernetesError
to the exceptions
dictionary below, with a 500
status (or whatever is appropriate). That way you can give it a specific error detail and title that would differentiate it from just an "Unknown error".
resources = self.task.resources | ||
minimum_ram_gb = self.kubernetes_client_wrapper.minimum_ram_gb() | ||
|
||
if not self.task.resources: | ||
self.task.resources = TesResources(cpu_cores=int(minimum_ram_gb)) | ||
if resources and resources.ram_gb and resources.ram_gb < minimum_ram_gb: | ||
self.task.resources.ram_gb = minimum_ram_gb |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this confusing to read. The focus seems to be jumping back and forth and I don't know why you defined resources
if it's anyway referencing the same object as self.task.resources
(if I'm not mistaken).
How about sth like this:
minimum_ram_gb = self.kubernetes_client_wrapper.minimum_ram_gb()
if self.task.resources is None:
self.task.resources = TesResources(cpu_cores=1, ram_gb=minimum_ram_gb)
elif (
self.task.resources.ram_gb is None or
self.task.resources.ram_gb < minimum_ram_gb
):
self.task.resources.ram_gb = minimum_ram_gb
You could also consider adding a method for setting resources (or a higher level method sanitize_request()
that then calls other methods for setting resources and maybe others in the future).
assert created_job.metadata is not None | ||
assert created_job.metadata.name is not None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider raising some sort of KubernetesError
instead.
attempts_no < self.tesk_k8s_constants.job_constants.JOB_CREATE_ATTEMPTS_NO | ||
): | ||
try: | ||
attempts_no += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider logging (in debug
) that you are attempting to create a Kubernetes task, together with the attempt number and the max attempt number. Something like:
"Creating Kubernetes job (attempt 3/8)"
except Exception as exc: | ||
logging.error("ERROR: In createTask", exc_info=True) | ||
raise exc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this handled automatically by FOCA?
logging.error("ERROR: In createTask", exc_info=True) | ||
raise exc | ||
|
||
return TesCreateTaskResponse(id="") # To silence mypy, should never be reached |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose this wouldn't be necessary if you defined the loop with while True:
?
Summary by Sourcery
Add functionality to create TES tasks and convert them into Kubernetes jobs, including new modules and API endpoints. Refactor existing code for consistency and enhance error handling.
New Features:
Enhancements: