Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions cdmtaskservice/app_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import os
from pathlib import Path
from typing import NamedTuple, Callable
from yarl import URL

from cdmtaskservice.config_s3 import S3Config
from cdmtaskservice.config import CDMTaskServiceConfig
Expand All @@ -28,7 +27,6 @@
from cdmtaskservice.jobflows.lawrencium_jaws import LawrenciumJAWSRunner
from cdmtaskservice.jobflows.nersc_jaws import NERSCJAWSRunner
from cdmtaskservice.job_state import JobState
from cdmtaskservice.localfiles import get_condor_exe_url, get_code_archive_url
from cdmtaskservice.notifications.kafka_notifications import KafkaNotifier
from cdmtaskservice.mongo import MongoDAO
from cdmtaskservice.nersc.client import NERSCSFAPIClientProvider
Expand Down Expand Up @@ -265,19 +263,12 @@ def _register_kbase_job_flow(
kafka_notifier: KafkaNotifier,
coman: CoroutineWrangler
):
exe_ov = URL(cfg.condor_exe_url_override) if cfg.condor_exe_url_override else None
code_ov = URL(cfg.code_archive_url_override) if cfg.code_archive_url_override else None
kbase_provider = KBaseFlowProvider.create(
Path(cfg.condor_initialdir),
str(get_condor_exe_url(base_url=URL(cfg.service_root_url), override=exe_ov)),
str(get_code_archive_url(base_url=URL(cfg.service_root_url), override=code_ov)),
cfg.get_condor_client_config(),
mongodao,
s3config,
kafka_notifier,
coman,
cfg.service_root_url,
cfg.get_condor_paths(),
condor_client_group=cfg.condor_clientgroup,
)
flowman.register_flow(KBaseRunner.CLUSTER, kbase_provider.get_kbase_job_flow)

Expand Down
74 changes: 14 additions & 60 deletions cdmtaskservice/condor/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,14 @@

import asyncio
from classad2 import ClassAd, ExprTree
from dataclasses import dataclass
import htcondor2
from pathlib import Path
import posixpath
from typing import Any
from yarl import URL

from cdmtaskservice.arg_checkers import (
not_falsy as _not_falsy,
require_string as _require_string,
check_num as _check_num,
)
from cdmtaskservice.arg_checkers import not_falsy as _not_falsy, check_num as _check_num
from cdmtaskservice.condor.config import CondorClientConfig
from cdmtaskservice.config_s3 import S3Config
from cdmtaskservice import models

Expand Down Expand Up @@ -106,69 +102,28 @@
}


@dataclass(frozen=True)
class HTCondorWorkerPaths:
""" Paths where external executors should look up secrets. """

# TODO CODE check args aren't None / whitespace only. Not a lib class so YAGNI for now

token_path: str
"""
The path on the condor worker containing a KBase token for use when
contacting the service.
"""

s3_access_secret_path: str
"""
The path on the condor worker containing the s3 access secret for the S3 instance.
"""


class CondorClient:
"""
The condor client.
"""

def __init__( # This is getting too long
self,
schedd: htcondor2.Schedd,
initial_dir: Path,
service_root_url: str,
executable_url: str,
code_archive_url: str,
s3config: S3Config,
paths: HTCondorWorkerPaths,
client_group: str | None = None,
):
def __init__(self, schedd: htcondor2.Schedd, config: CondorClientConfig, s3config: S3Config):
"""
Create the client.

schedd: An htcondor Schedd instance, configured to submit jobs to the cluster.
initial_dir - where the job logs should be stored on the condor scheduler host.
The path must exist there and locally.
service_root_url - the URL of the service root, used by the remote job to get and update
job state.
executable_url - the url for the executable to run in the condor worker for each job.
Must end in a file name and have no query or fragment.
code_archive_url - the url for the *.tgz file code archive to transfer to the condor worker
for each job. Must end in a file name and have no query or fragment.
config - the configuration for the client.
s3Config - the configuration for the S3 instance where files are stored.
paths - the paths on the HTCondor workers should look for job secrets.
client_group - the client group to submit jobs to, if any. This is a classad on
a worker with the name CLIENTGROUP.
"""
self._schedd = _not_falsy(schedd, "schedd")
self._initial_dir = _not_falsy(initial_dir, "initial_dir")
self._config = _not_falsy(config, "config")
# Why this has to exist locally is beyond me
self._initial_dir.mkdir(parents=True, exist_ok=True)
self._service_root_url = _require_string(service_root_url, "service_root_url")
self._exe_url = _require_string(executable_url, "executable_url")
Path(self._config.initial_dir).mkdir(parents=True, exist_ok=True)
self._exe_url = config.get_executable_url()
self._exe_name = self._get_name_from_url(self._exe_url)
self._code_archive_url = _require_string(code_archive_url, "code_archive_url")
self._code_archive_url = config.get_code_archive_url()
self._code_archive_name = self._get_name_from_url(self._code_archive_url)
self._s3config = _not_falsy(s3config, "s3config")
self._paths = _not_falsy(paths, "paths")
self._cligrp = client_group

def _get_name_from_url(self, url: str) -> str:
parsed = URL(url)
Expand All @@ -187,13 +142,12 @@ def _get_environment(self, job: models.Job) -> str:
"JOB_ID": job.id,
"CONTAINER_NUMBER": "$(container_number)",
"CODE_ARCHIVE": self._code_archive_name,
"SERVICE_ROOT_URL": self._service_root_url,
"TOKEN_PATH": self._paths.token_path,
"SERVICE_ROOT_URL": self._config.service_root_url,
"TOKEN_PATH": self._config.token_path,
"S3_URL": self._s3config.internal_url, # could add a toggle to use external if needed
"S3_ACCESS_KEY": self._s3config.access_key,
"S3_SECRET_PATH": self._paths.s3_access_secret_path,
"S3_SECRET_PATH": self._config.s3_access_secret_path,
"S3_ERROR_LOG_PATH": self._s3config.error_log_path,
# TODO CONDOR need minio url
}
if self._s3config.insecure:
env["S3_INSECURE"] = "TRUE"
Expand All @@ -211,7 +165,7 @@ def _get_sub(self, job: models.Job) -> tuple[htcondor2.Submit, list[dict[str, st
"shell": f"bash {self._exe_name}",
# Has to exist locally and on the condor Schedd host
# Which doesn't make any sense
"initialdir": str(self._initial_dir),
"initialdir": self._config.initial_dir,
"transfer_input_files": f"{self._exe_url}, {self._code_archive_url}",
"environment": self._get_environment(job),
"output": f"cts/{job.id}/cts-{job.id}-$(container_number).out",
Expand All @@ -232,9 +186,9 @@ def _get_sub(self, job: models.Job) -> tuple[htcondor2.Submit, list[dict[str, st
f"+{_AD_JOB_ID}": f'"{job.id}"', # must be quoted
f"+{_AD_CONTAINER_NUMBER}": "$(container_number)",
}
if self._cligrp:
if self._config.client_group:
# HTCondor will && this with its own requirements
subdict["requirements"] = f'(CLIENTGROUP == "{self._cligrp}")'
subdict["requirements"] = f'(CLIENTGROUP == "{self._config.client_group}")'
sub = htcondor2.Submit(subdict | STATIC_SUB)
itemdata = [
{"container_number": str(i)}
Expand Down
69 changes: 69 additions & 0 deletions cdmtaskservice/condor/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""
Configuration for the CTS HTCondor client.
"""

from pydantic import BaseModel, Field
from yarl import URL

from cdmtaskservice.localfiles import get_condor_exe_url, get_code_archive_url


class CondorClientConfig(BaseModel):
"""
Configuration items for the HTCondor client.
"""

initial_dir: str
"""
Where the job logs should be stored on the condor scheduler host.
The path must exist there and locally.
"""

service_root_url: str
"""
The URL of the service root, used by the remote job to get and update job state.
"""

executable_url_override: str | None
"""
A URL, if any, to use for downloading the HTCondor shell script rather than the
default location.
Must end in a file name and have no query or fragment.
"""

code_archive_url_override: str | None
"""
A URL, if any, to use for downloading the tgz code archive rather than the default location.
Must end in a file name and have no query or fragment.
"""

client_group: str | None
"""
The client group to submit jobs to, if any. This is a classad on a worker with
the name CLIENTGROUP.
"""

token_path: str
"""
The path on the condor worker containing a KBase token for use when
contacting the service.
"""

s3_access_secret_path: str
"""
The path on the condor worker containing the s3 access secret for the S3 instance.
"""

def get_executable_url(self) -> str:
"""
Get the URL where the external executor should download the shell executable for the job.
"""
exe_ov = URL(self.executable_url_override) if self.executable_url_override else None
return str(get_condor_exe_url(base_url=URL(self.service_root_url), override=exe_ov))

def get_code_archive_url(self) -> str:
"""
Get the URL where the external executor should download the CTS code archive.
"""
code_ov = URL(self.code_archive_url_override) if self.code_archive_url_override else None
return str(get_code_archive_url(base_url=URL(self.service_root_url), override=code_ov))
13 changes: 9 additions & 4 deletions cdmtaskservice/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import tomllib
from typing import BinaryIO, TextIO

from cdmtaskservice.condor.client import HTCondorWorkerPaths
from cdmtaskservice.condor.config import CondorClientConfig
from cdmtaskservice.config_s3 import S3Config
from cdmtaskservice.jaws.config import JAWSConfig
from cdmtaskservice.nersc.paths import NERSCPaths
Expand Down Expand Up @@ -249,11 +249,16 @@ def get_jaws_config(self) -> JAWSConfig:
url=self.jaws_url,
)

def get_condor_paths(self) -> HTCondorWorkerPaths:
def get_condor_client_config(self) -> CondorClientConfig:
"""
Get information about environment variables on HTcondor workers for external executors.
Get the configuration items for the condor client.
"""
return HTCondorWorkerPaths(
return CondorClientConfig(
initial_dir=self.condor_initialdir,
service_root_url=self.service_root_url,
executable_url_override=self.condor_exe_url_override,
code_archive_url_override=self.code_archive_url_override,
client_group=self.condor_clientgroup,
token_path=self.condor_token_path,
s3_access_secret_path=self.condor_s3_access_secret_path,
)
Expand Down
6 changes: 2 additions & 4 deletions cdmtaskservice/config_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"""

from pydantic import BaseModel, Field
from typing import Optional

from cdmtaskservice.s3.client import S3Client

Expand Down Expand Up @@ -36,11 +35,10 @@ class S3Config(BaseModel):

verify_external_url: bool = True
""" Whether to verify connectivity to the external S3 url at service startup. """


# Internal singletons
_s3_client: Optional[S3Client] = None
_s3_external_client: Optional[S3Client] = None
_s3_client: S3Client | None = None
_s3_external_client: S3Client | None = None

async def initialize_clients(self):
"""
Expand Down
2 changes: 1 addition & 1 deletion cdmtaskservice/jobflows/jaws_flows_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ async def close(self):
self._closed = True
await self._nersc_status_cli.close()
if self._sfapi_client:
await self._flow_builds.sfapi_client.destroy()
await self._sfapi_client.destroy()
if isinstance(self._build_state, _Dependencies):
await self._build_state.jaws_client.close()

Expand Down
Loading
Loading