Skip to content

Commit

Permalink
feat: adds endpoint to submit hpc configs
Browse files Browse the repository at this point in the history
  • Loading branch information
jtyoung84 committed Oct 11, 2023
1 parent acf8253 commit 839f455
Show file tree
Hide file tree
Showing 6 changed files with 349 additions and 63 deletions.
9 changes: 8 additions & 1 deletion src/aind_data_transfer_service/hpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,14 @@ def get_jobs(self) -> Response:
)
return response

def submit_job(
def submit_job(self, job_def: dict) -> Response:
"""Submit a job defined by job def"""
response = requests.post(
url=self._job_submit_url, json=job_def, headers=self.__headers
)
return response

def submit_hpc_job(
self,
script: str,
job: Optional[HpcJobSubmitSettings] = None,
Expand Down
75 changes: 58 additions & 17 deletions src/aind_data_transfer_service/hpc/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Module to contain models for hpc rest api responses."""

import json
from datetime import datetime
from pathlib import Path
from typing import Any, List, Optional, Union
Expand All @@ -14,10 +15,6 @@
)
from pydantic.typing import Literal

from aind_data_transfer_service.configs.job_configs import (
BasicUploadJobConfigs,
)


class HpcJobSubmitSettings(BaseSettings):
"""Configs to send in a post request. v0.0.36 of slurm rest api."""
Expand Down Expand Up @@ -405,9 +402,8 @@ def _set_default_val(values: dict, key: str, default_value: Any) -> None:
return None

@classmethod
def from_basic_job_configs(
def from_upload_job_configs(
cls,
basic_upload_job_configs: BasicUploadJobConfigs,
logging_directory: Path,
aws_secret_access_key: SecretStr,
aws_access_key_id: str,
Expand All @@ -419,7 +415,6 @@ def from_basic_job_configs(
Class constructor to use when submitting a basic upload job request
Parameters
----------
basic_upload_job_configs : BasicUploadJobConfigs
logging_directory : Path
aws_secret_access_key : SecretStr
aws_access_key_id : str
Expand All @@ -436,20 +431,14 @@ def from_basic_job_configs(
),
"SINGULARITYENV_AWS_ACCESS_KEY_ID": aws_access_key_id,
"SINGULARITYENV_AWS_DEFAULT_REGION": aws_default_region,
"SINGULARITYENV_UPLOAD_JOB_JSON_ARGS": (
basic_upload_job_configs.json(exclude_none=True)
),
}
if aws_session_token is not None:
hpc_env[
"SINGULARITYENV_AWS_SESSION_TOKEN"
] = aws_session_token.get_secret_value()
cls._set_default_val(kwargs, "environment", hpc_env)
cls._set_default_val(
kwargs, "name", basic_upload_job_configs.s3_prefix
)
# Set default time limit to 6 hours
cls._set_default_val(kwargs, "time_limit", 360)
# Set default time limit to 3 hours
cls._set_default_val(kwargs, "time_limit", 180)
cls._set_default_val(
kwargs,
"standard_out",
Expand All @@ -461,10 +450,62 @@ def from_basic_job_configs(
str(logging_directory / (kwargs["name"] + "_error.out")),
)
cls._set_default_val(kwargs, "nodes", [1, 1])
# Set memory per node to 50 GB
cls._set_default_val(kwargs, "memory_per_node", 50000)
cls._set_default_val(kwargs, "minimum_cpus_per_node", 4)
cls._set_default_val(kwargs, "tasks", 1)
# 8 GB per cpu for 32 GB total memory
cls._set_default_val(kwargs, "memory_per_cpu", 8000)
return cls(**kwargs)

@classmethod
def attach_configs_to_script(
cls,
script: str,
base_configs: dict,
upload_configs_aws_param_store_name: Optional[str],
staging_directory: Optional[str],
) -> str:
"""
Helper method to attach configs to a base run command string.
Parameters
----------
script : str
Can be like
'#!/bin/bash \nsingularity exec --cleanenv
feat_289.sif python -m aind_data_transfer.jobs.basic_job'
base_configs : dict
job_configs to attach as --json-args
upload_configs_aws_param_store_name : Optional[str]
Will supply this config if not in base_configs and not None
staging_directory : Optional[str]
Will supply this config if not in base_configs and not None
Returns
-------
str
The run command script to send to submit to the slurm cluster
"""
if staging_directory is not None:
cls._set_default_val(
base_configs, "temp_directory", staging_directory
)
if upload_configs_aws_param_store_name is not None:
cls._set_default_val(
base_configs,
"aws_param_store_name",
upload_configs_aws_param_store_name,
)

return " ".join(
[
script,
"--json-args",
"'",
json.dumps(base_configs),
"'",
]
)


class HpcJobStatusResponse(BaseModel):
"""Model for a job status response for v0.0.36
Expand Down
121 changes: 100 additions & 21 deletions src/aind_data_transfer_service/server.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""Starts and Runs Starlette Service"""
import csv
import io
import json
import logging
import os
from asyncio import sleep
from pathlib import Path

from fastapi import Request
Expand All @@ -14,6 +16,7 @@

from aind_data_transfer_service.configs.job_configs import (
BasicUploadJobConfigs,
HpcJobConfigs,
)
from aind_data_transfer_service.hpc.client import HpcClient, HpcClientConfigs
from aind_data_transfer_service.hpc.models import (
Expand Down Expand Up @@ -85,8 +88,69 @@ async def submit_basic_jobs(request: Request):
basic_upload_job.temp_directory = os.getenv(
"HPC_STAGING_DIRECTORY"
)
hpc_job = HpcJobSubmitSettings.from_basic_job_configs(
basic_upload_job_configs=basic_upload_job,
hpc_job = HpcJobConfigs(basic_upload_job_configs=basic_upload_job)
hpc_jobs.append(hpc_job)
except Exception as e:
parsing_errors.append(f"Error parsing {job}: {e.__class__}")
if parsing_errors:
status_code = 406
message = "There were errors parsing the basic job configs"
content = {
"message": message,
"data": {"responses": [], "errors": parsing_errors},
}
else:
responses = []
hpc_errors = []
for hpc_job in hpc_jobs:
try:
job_def = hpc_job.job_definition
response = hpc_client.submit_job(job_def)
response_json = response.json()
responses.append(response_json)
# Add pause to stagger job requests to the hpc
await sleep(0.2)
except Exception as e:
logging.error(repr(e))
hpc_errors.append(
f"Error processing "
f"{hpc_job.basic_upload_job_configs.s3_prefix}"
)
message = (
"There were errors submitting jobs to the hpc."
if len(hpc_errors) > 0
else "Submitted Jobs."
)
status_code = 500 if len(hpc_errors) > 0 else 200
content = {
"message": message,
"data": {"responses": responses, "errors": hpc_errors},
}
return JSONResponse(
content=content,
status_code=status_code,
)


async def submit_hpc_jobs(request: Request):
"""Post HpcJobSubmitSettings to hpc server to process."""

content = await request.json()
# content should have
# {
# "jobs": [{"hpc_settings": str, upload_job_settings: str, script: str}]
# }
hpc_client_conf = HpcClientConfigs()
hpc_client = HpcClient(configs=hpc_client_conf)
job_configs = content["jobs"]
hpc_jobs = []
parsing_errors = []
for job in job_configs:
try:
upload_job_configs = json.loads(job["upload_job_settings"])
hpc_settings = json.loads(job["hpc_settings"])
base_script = job["script"]
hpc_job = HpcJobSubmitSettings.from_upload_job_configs(
logging_directory=Path(os.getenv("HPC_LOGGING_DIRECTORY")),
aws_secret_access_key=SecretStr(
os.getenv("HPC_AWS_SECRET_ACCESS_KEY")
Expand All @@ -100,40 +164,54 @@ async def submit_basic_jobs(request: Request):
else SecretStr(os.getenv("HPC_AWS_SESSION_TOKEN"))
)
),
**hpc_settings,
)
hpc_jobs.append(hpc_job)
script = hpc_job.attach_configs_to_script(
script=base_script,
base_configs=upload_job_configs,
upload_configs_aws_param_store_name=os.getenv(
"HPC_AWS_PARAM_STORE_NAME"
),
staging_directory=os.getenv("HPC_STAGING_DIRECTORY"),
)
hpc_jobs.append((hpc_job, script))
except Exception as e:
parsing_errors.append(f"Error parsing {job}: {e.__class__}")
parsing_errors.append(
f"Error parsing {job['upload_job_settings']}: {repr(e)}"
)
if parsing_errors:
status_code = 406
message = "There were errors parsing the basic job configs"
message = "There were errors parsing the job configs"
content = {
"message": message,
"data": {"responses": [], "errors": parsing_errors},
}
else:
hpc_error = None
response_json = {}
try:
response = hpc_client.submit_job(
script=HpcJobSubmitSettings.script_command_str(
os.getenv("HPC_SIF_LOCATION")
),
jobs=hpc_jobs,
)
response_json = response.json()
except Exception as e:
logging.error(repr(e))
hpc_error = repr(e)
responses = []
hpc_errors = []
for hpc_job in hpc_jobs:
hpc_job_def = hpc_job[0]
try:
script = hpc_job[1]
response = hpc_client.submit_hpc_job(
job=hpc_job_def, script=script
)
response_json = response.json()
responses.append(response_json)
# Add pause to stagger job requests to the hpc
await sleep(0.2)
except Exception as e:
logging.error(repr(e))
hpc_errors.append(f"Error processing " f"{hpc_job_def.name}")
message = (
"There were errors submitting jobs to the hpc."
if hpc_error
if len(hpc_errors) > 0
else "Submitted Jobs."
)
status_code = 500 if hpc_error else 200
status_code = 500 if len(hpc_errors) > 0 else 200
content = {
"message": message,
"data": {"response": response_json, "error": hpc_error},
"data": {"responses": responses, "errors": hpc_errors},
}
return JSONResponse(
content=content,
Expand Down Expand Up @@ -192,6 +270,7 @@ async def jobs(request: Request):
Route(
"/api/submit_basic_jobs", endpoint=submit_basic_jobs, methods=["POST"]
),
Route("/api/submit_hpc_jobs", endpoint=submit_hpc_jobs, methods=["POST"]),
Route("/jobs", endpoint=jobs, methods=["GET"]),
]

Expand Down
23 changes: 20 additions & 3 deletions tests/test_hpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,24 @@ def test_submit_job_response(self, mock_post: MagicMock):
"""Tests that the job submission request is sent correctly"""
mock_post.return_value = {"message": "A mocked message"}
hpc_client = HpcClient(configs=self.hpc_client_configs)
response = hpc_client.submit_job(
response = hpc_client.submit_job(job_def={"job": {"some_job"}})
self.assertEqual({"message": "A mocked message"}, response)
mock_post.assert_called_once_with(
url="http://hpc_host/job/submit",
json={"job": {"some_job"}},
headers={
"X-SLURM-USER-NAME": "hpc_username",
"X-SLURM-USER-PASSWORD": "hpc_password",
"X-SLURM-USER-TOKEN": "hpc_jwt",
},
)

@patch("requests.post")
def test_submit_hpc_job_response(self, mock_post: MagicMock):
"""Tests that the job submission request is sent correctly"""
mock_post.return_value = {"message": "A mocked message"}
hpc_client = HpcClient(configs=self.hpc_client_configs)
response = hpc_client.submit_hpc_job(
script="Hello World!", job=HpcJobSubmitSettings(name="test_job")
)
self.assertEqual({"message": "A mocked message"}, response)
Expand All @@ -121,15 +138,15 @@ def test_submit_job_response(self, mock_post: MagicMock):
)

@patch("requests.post")
def test_submit_jobs_response(self, mock_post: MagicMock):
def test_submit_hpc_jobs_response(self, mock_post: MagicMock):
"""Tests that the jobs submission request is sent correctly"""
mock_post.return_value = {"message": "A mocked message"}
hpc_client = HpcClient(configs=self.hpc_client_configs)
jobs = [
HpcJobSubmitSettings(name="test_job1"),
HpcJobSubmitSettings(name="test_job2"),
]
response = hpc_client.submit_job(script="Hello World!", jobs=jobs)
response = hpc_client.submit_hpc_job(script="Hello World!", jobs=jobs)
self.assertEqual({"message": "A mocked message"}, response)
mock_post.assert_called_once_with(
url="http://hpc_host/job/submit",
Expand Down
Loading

0 comments on commit 839f455

Please sign in to comment.