diff --git a/cdmtaskservice/app_state.py b/cdmtaskservice/app_state.py index 3d0d76b..aeec2ca 100644 --- a/cdmtaskservice/app_state.py +++ b/cdmtaskservice/app_state.py @@ -13,7 +13,6 @@ import os from pathlib import Path from typing import NamedTuple, Callable -from yarl import URL from cdmtaskservice.config_s3 import S3Config from cdmtaskservice.config import CDMTaskServiceConfig @@ -28,7 +27,6 @@ from cdmtaskservice.jobflows.lawrencium_jaws import LawrenciumJAWSRunner from cdmtaskservice.jobflows.nersc_jaws import NERSCJAWSRunner from cdmtaskservice.job_state import JobState -from cdmtaskservice.localfiles import get_condor_exe_url, get_code_archive_url from cdmtaskservice.notifications.kafka_notifications import KafkaNotifier from cdmtaskservice.mongo import MongoDAO from cdmtaskservice.nersc.client import NERSCSFAPIClientProvider @@ -265,19 +263,12 @@ def _register_kbase_job_flow( kafka_notifier: KafkaNotifier, coman: CoroutineWrangler ): - exe_ov = URL(cfg.condor_exe_url_override) if cfg.condor_exe_url_override else None - code_ov = URL(cfg.code_archive_url_override) if cfg.code_archive_url_override else None kbase_provider = KBaseFlowProvider.create( - Path(cfg.condor_initialdir), - str(get_condor_exe_url(base_url=URL(cfg.service_root_url), override=exe_ov)), - str(get_code_archive_url(base_url=URL(cfg.service_root_url), override=code_ov)), + cfg.get_condor_client_config(), mongodao, s3config, kafka_notifier, coman, - cfg.service_root_url, - cfg.get_condor_paths(), - condor_client_group=cfg.condor_clientgroup, ) flowman.register_flow(KBaseRunner.CLUSTER, kbase_provider.get_kbase_job_flow) diff --git a/cdmtaskservice/condor/client.py b/cdmtaskservice/condor/client.py index b38bb57..3f457bb 100644 --- a/cdmtaskservice/condor/client.py +++ b/cdmtaskservice/condor/client.py @@ -5,18 +5,14 @@ import asyncio from classad2 import ClassAd, ExprTree -from dataclasses import dataclass import htcondor2 from pathlib import Path import posixpath from typing import Any from yarl import URL -from cdmtaskservice.arg_checkers import ( - not_falsy as _not_falsy, - require_string as _require_string, - check_num as _check_num, -) +from cdmtaskservice.arg_checkers import not_falsy as _not_falsy, check_num as _check_num +from cdmtaskservice.condor.config import CondorClientConfig from cdmtaskservice.config_s3 import S3Config from cdmtaskservice import models @@ -106,69 +102,28 @@ } -@dataclass(frozen=True) -class HTCondorWorkerPaths: - """ Paths where external executors should look up secrets. """ - - # TODO CODE check args aren't None / whitespace only. Not a lib class so YAGNI for now - - token_path: str - """ - The path on the condor worker containing a KBase token for use when - contacting the service. - """ - - s3_access_secret_path: str - """ - The path on the condor worker containing the s3 access secret for the S3 instance. - """ - - class CondorClient: """ The condor client. """ - def __init__( # This is getting too long - self, - schedd: htcondor2.Schedd, - initial_dir: Path, - service_root_url: str, - executable_url: str, - code_archive_url: str, - s3config: S3Config, - paths: HTCondorWorkerPaths, - client_group: str | None = None, - ): + def __init__(self, schedd: htcondor2.Schedd, config: CondorClientConfig, s3config: S3Config): """ Create the client. schedd: An htcondor Schedd instance, configured to submit jobs to the cluster. - initial_dir - where the job logs should be stored on the condor scheduler host. - The path must exist there and locally. - service_root_url - the URL of the service root, used by the remote job to get and update - job state. - executable_url - the url for the executable to run in the condor worker for each job. - Must end in a file name and have no query or fragment. - code_archive_url - the url for the *.tgz file code archive to transfer to the condor worker - for each job. Must end in a file name and have no query or fragment. + config - the configuration for the client. s3Config - the configuration for the S3 instance where files are stored. - paths - the paths on the HTCondor workers should look for job secrets. - client_group - the client group to submit jobs to, if any. This is a classad on - a worker with the name CLIENTGROUP. """ self._schedd = _not_falsy(schedd, "schedd") - self._initial_dir = _not_falsy(initial_dir, "initial_dir") + self._config = _not_falsy(config, "config") # Why this has to exist locally is beyond me - self._initial_dir.mkdir(parents=True, exist_ok=True) - self._service_root_url = _require_string(service_root_url, "service_root_url") - self._exe_url = _require_string(executable_url, "executable_url") + Path(self._config.initial_dir).mkdir(parents=True, exist_ok=True) + self._exe_url = config.get_executable_url() self._exe_name = self._get_name_from_url(self._exe_url) - self._code_archive_url = _require_string(code_archive_url, "code_archive_url") + self._code_archive_url = config.get_code_archive_url() self._code_archive_name = self._get_name_from_url(self._code_archive_url) self._s3config = _not_falsy(s3config, "s3config") - self._paths = _not_falsy(paths, "paths") - self._cligrp = client_group def _get_name_from_url(self, url: str) -> str: parsed = URL(url) @@ -187,13 +142,12 @@ def _get_environment(self, job: models.Job) -> str: "JOB_ID": job.id, "CONTAINER_NUMBER": "$(container_number)", "CODE_ARCHIVE": self._code_archive_name, - "SERVICE_ROOT_URL": self._service_root_url, - "TOKEN_PATH": self._paths.token_path, + "SERVICE_ROOT_URL": self._config.service_root_url, + "TOKEN_PATH": self._config.token_path, "S3_URL": self._s3config.internal_url, # could add a toggle to use external if needed "S3_ACCESS_KEY": self._s3config.access_key, - "S3_SECRET_PATH": self._paths.s3_access_secret_path, + "S3_SECRET_PATH": self._config.s3_access_secret_path, "S3_ERROR_LOG_PATH": self._s3config.error_log_path, - # TODO CONDOR need minio url } if self._s3config.insecure: env["S3_INSECURE"] = "TRUE" @@ -211,7 +165,7 @@ def _get_sub(self, job: models.Job) -> tuple[htcondor2.Submit, list[dict[str, st "shell": f"bash {self._exe_name}", # Has to exist locally and on the condor Schedd host # Which doesn't make any sense - "initialdir": str(self._initial_dir), + "initialdir": self._config.initial_dir, "transfer_input_files": f"{self._exe_url}, {self._code_archive_url}", "environment": self._get_environment(job), "output": f"cts/{job.id}/cts-{job.id}-$(container_number).out", @@ -232,9 +186,9 @@ def _get_sub(self, job: models.Job) -> tuple[htcondor2.Submit, list[dict[str, st f"+{_AD_JOB_ID}": f'"{job.id}"', # must be quoted f"+{_AD_CONTAINER_NUMBER}": "$(container_number)", } - if self._cligrp: + if self._config.client_group: # HTCondor will && this with its own requirements - subdict["requirements"] = f'(CLIENTGROUP == "{self._cligrp}")' + subdict["requirements"] = f'(CLIENTGROUP == "{self._config.client_group}")' sub = htcondor2.Submit(subdict | STATIC_SUB) itemdata = [ {"container_number": str(i)} diff --git a/cdmtaskservice/condor/config.py b/cdmtaskservice/condor/config.py new file mode 100644 index 0000000..a1e9790 --- /dev/null +++ b/cdmtaskservice/condor/config.py @@ -0,0 +1,69 @@ +""" +Configuration for the CTS HTCondor client. +""" + +from pydantic import BaseModel, Field +from yarl import URL + +from cdmtaskservice.localfiles import get_condor_exe_url, get_code_archive_url + + +class CondorClientConfig(BaseModel): + """ + Configuration items for the HTCondor client. + """ + + initial_dir: str + """ + Where the job logs should be stored on the condor scheduler host. + The path must exist there and locally. + """ + + service_root_url: str + """ + The URL of the service root, used by the remote job to get and update job state. + """ + + executable_url_override: str | None + """ + A URL, if any, to use for downloading the HTCondor shell script rather than the + default location. + Must end in a file name and have no query or fragment. + """ + + code_archive_url_override: str | None + """ + A URL, if any, to use for downloading the tgz code archive rather than the default location. + Must end in a file name and have no query or fragment. + """ + + client_group: str | None + """ + The client group to submit jobs to, if any. This is a classad on a worker with + the name CLIENTGROUP. + """ + + token_path: str + """ + The path on the condor worker containing a KBase token for use when + contacting the service. + """ + + s3_access_secret_path: str + """ + The path on the condor worker containing the s3 access secret for the S3 instance. + """ + + def get_executable_url(self) -> str: + """ + Get the URL where the external executor should download the shell executable for the job. + """ + exe_ov = URL(self.executable_url_override) if self.executable_url_override else None + return str(get_condor_exe_url(base_url=URL(self.service_root_url), override=exe_ov)) + + def get_code_archive_url(self) -> str: + """ + Get the URL where the external executor should download the CTS code archive. + """ + code_ov = URL(self.code_archive_url_override) if self.code_archive_url_override else None + return str(get_code_archive_url(base_url=URL(self.service_root_url), override=code_ov)) diff --git a/cdmtaskservice/config.py b/cdmtaskservice/config.py index aedef30..5a38b1a 100644 --- a/cdmtaskservice/config.py +++ b/cdmtaskservice/config.py @@ -6,7 +6,7 @@ import tomllib from typing import BinaryIO, TextIO -from cdmtaskservice.condor.client import HTCondorWorkerPaths +from cdmtaskservice.condor.config import CondorClientConfig from cdmtaskservice.config_s3 import S3Config from cdmtaskservice.jaws.config import JAWSConfig from cdmtaskservice.nersc.paths import NERSCPaths @@ -249,11 +249,16 @@ def get_jaws_config(self) -> JAWSConfig: url=self.jaws_url, ) - def get_condor_paths(self) -> HTCondorWorkerPaths: + def get_condor_client_config(self) -> CondorClientConfig: """ - Get information about environment variables on HTcondor workers for external executors. + Get the configuration items for the condor client. """ - return HTCondorWorkerPaths( + return CondorClientConfig( + initial_dir=self.condor_initialdir, + service_root_url=self.service_root_url, + executable_url_override=self.condor_exe_url_override, + code_archive_url_override=self.code_archive_url_override, + client_group=self.condor_clientgroup, token_path=self.condor_token_path, s3_access_secret_path=self.condor_s3_access_secret_path, ) diff --git a/cdmtaskservice/config_s3.py b/cdmtaskservice/config_s3.py index febeaf4..d404783 100644 --- a/cdmtaskservice/config_s3.py +++ b/cdmtaskservice/config_s3.py @@ -3,7 +3,6 @@ """ from pydantic import BaseModel, Field -from typing import Optional from cdmtaskservice.s3.client import S3Client @@ -36,11 +35,10 @@ class S3Config(BaseModel): verify_external_url: bool = True """ Whether to verify connectivity to the external S3 url at service startup. """ - # Internal singletons - _s3_client: Optional[S3Client] = None - _s3_external_client: Optional[S3Client] = None + _s3_client: S3Client | None = None + _s3_external_client: S3Client | None = None async def initialize_clients(self): """ diff --git a/cdmtaskservice/jobflows/jaws_flows_provider.py b/cdmtaskservice/jobflows/jaws_flows_provider.py index d861bfe..4671be7 100644 --- a/cdmtaskservice/jobflows/jaws_flows_provider.py +++ b/cdmtaskservice/jobflows/jaws_flows_provider.py @@ -141,7 +141,7 @@ async def close(self): self._closed = True await self._nersc_status_cli.close() if self._sfapi_client: - await self._flow_builds.sfapi_client.destroy() + await self._sfapi_client.destroy() if isinstance(self._build_state, _Dependencies): await self._build_state.jaws_client.close() diff --git a/cdmtaskservice/jobflows/kbase.py b/cdmtaskservice/jobflows/kbase.py index 6999f90..1940be3 100644 --- a/cdmtaskservice/jobflows/kbase.py +++ b/cdmtaskservice/jobflows/kbase.py @@ -4,7 +4,6 @@ import htcondor2 import logging -from pathlib import Path import time from typing import Any @@ -13,7 +12,8 @@ check_num as _check_num, require_string as _require_string, ) -from cdmtaskservice.condor.client import CondorClient, HTCondorWorkerPaths +from cdmtaskservice.condor.client import CondorClient +from cdmtaskservice.condor.config import CondorClientConfig from cdmtaskservice.config_s3 import S3Config from cdmtaskservice.coroutine_manager import CoroutineWrangler from cdmtaskservice.exceptions import ( @@ -220,7 +220,6 @@ async def update_container_state( f"Cannot update a container to state {update.new_state.value}" ) update_func = self._SUBJOB_STATE_TO_UPDATE_FUNC[update.new_state] - # TODO TEST will need a way to mock out timestamps # Just throw the error, don't error out the job. If the caller thinks this is an error # they can try and set the error state. await self._mongo.update_subjob_state(job.id, container_num, update_func(), update.time) @@ -279,51 +278,30 @@ class KBaseFlowProvider: @classmethod def create( cls, - condor_initial_dir: Path, - condor_executable_url: str, - condor_code_archive_url: str, + condor_config: CondorClientConfig, mongodao: MongoDAO, s3config: S3Config, kafka_notifier: KafkaNotifier, coman: CoroutineWrangler, - service_root_url: str, - paths: HTCondorWorkerPaths, - condor_client_group: str | None = None, ): """ WARNING: this class is not thread safe. Create the flow provider. - - condor_initial_dir - where the job logs should be stored on the condor scheduler host. - The path must exist there and locally. - condor_executable_url - the url for the executable to run in the condor worker for - each job. Must end in a file name and have no query or fragment. - condor_code_archive_url - the url for the *.tgz file code archive to transfer to - the condor worker for each job. Must end in a file name and have no query or fragment. + + condor_config - the configuration for the condor client. mongodao - the Mongo DAO. s3config - the S3 configuration. kafka_notifier - a kafka notifier. coman - a coroutine manager. - service_root_url - the URL of the service root, used by the remote job to get and update - job state. - paths - paths where external executors should look up job information - on the HTcondor workers. - condor_client_group - the client group to submit jobs to, if any. This is a classad on - a worker with the name CLIENTGROUP. """ kb = cls() - kb._initial_dir = _not_falsy(condor_initial_dir, "condor_initial_dir") - kb._exe_url = _require_string(condor_executable_url, "condor_executable_url") - kb._code_archive_url = _require_string(condor_code_archive_url, "condor_code_archive_url") + kb._condor_config = _not_falsy(condor_config, "condor_config") kb._mongodao = _not_falsy(mongodao, "mongodao") kb._s3config = _not_falsy(s3config, "s3config") kb._kafka = _not_falsy(kafka_notifier, "kafka_notifier") kb._coman = _not_falsy(coman, "coman") - kb._service_root_url = _require_string(service_root_url, "service_root_url") - kb._paths = _not_falsy(paths, "paths") - kb._cligrp = condor_client_group kb._logr = logging.getLogger(__name__) @@ -392,23 +370,13 @@ def _build_deps(self): schedd_ad = collector.locate(htcondor2.DaemonTypes.Schedd) schedd = htcondor2.Schedd(schedd_ad) self._logr.info("Done") - condor = CondorClient( - schedd, - self._initial_dir, - self._service_root_url, - self._exe_url, - self._code_archive_url, - self._s3config, - self._paths, - client_group=self._cligrp, - # TODO CONDOR add s3 host / insecure/ log path - ) + condor = CondorClient(schedd, self._condor_config, self._s3config) kbase = KBaseRunner( condor, self._mongodao, self._s3config.get_internal_client(), self._kafka, - self._coman + self._coman, ) self._kbase = kbase self._last_fail_time = None diff --git a/test/condor/condor_config_test.py b/test/condor/condor_config_test.py new file mode 100644 index 0000000..891fb76 --- /dev/null +++ b/test/condor/condor_config_test.py @@ -0,0 +1,7 @@ +# TODO TEST add tests + +from cdmtaskservice.condor import config # @UnusedImport + + +def test_noop(): + pass