From a8fdf6a1076158740166b8de10ab9f63765f348f Mon Sep 17 00:00:00 2001 From: Quentin Long Date: Mon, 8 Sep 2025 17:36:38 -0700 Subject: [PATCH 1/2] Draft of tron POC --- paasta_tools/kubernetes/remote_run.py | 63 ++++- paasta_tools/tron_tools.py | 372 ++++++++++++++++++++++++++ 2 files changed, 422 insertions(+), 13 deletions(-) diff --git a/paasta_tools/kubernetes/remote_run.py b/paasta_tools/kubernetes/remote_run.py index 95fbc089ab..f85f0f4037 100644 --- a/paasta_tools/kubernetes/remote_run.py +++ b/paasta_tools/kubernetes/remote_run.py @@ -43,6 +43,9 @@ from paasta_tools.kubernetes_tools import KubeClient from paasta_tools.kubernetes_tools import limit_size_with_hash from paasta_tools.kubernetes_tools import paasta_prefixed +from paasta_tools.tron_tools import format_tron_action_dict +from paasta_tools.tron_tools import load_tron_instance_configs +from paasta_tools.tron_tools import TronActionConfig from paasta_tools.utils import load_system_paasta_config from paasta_tools.utils import NoConfigurationForServiceError @@ -79,6 +82,22 @@ def format_remote_run_job_name( return limit_size_with_hash(f"remote-run-{user}-{job_name}") +def load_tron_config(service: str, instance: str, cluster: str) -> TronActionConfig: + actions = load_tron_instance_configs(service, cluster) + for action in actions: + if action.instance == instance: + break + else: + raise RemoteRunError(f"No instance {instance} found for {service}") + + if action.get_executor() != "paasta": + raise RemoteRunError( + f"{instance} is not a paasta executor action and is not compatible with remote-run" + ) + + return action + + def load_eks_or_adhoc_deployment_config( service: str, instance: str, @@ -133,16 +152,26 @@ def remote_run_start( """ kube_client = KubeClient() - # Load the service deployment settings - deployment_config = load_eks_or_adhoc_deployment_config( - service, instance, cluster, is_toolbox, user - ) + tron = False + try: + # Load the service deployment settings + deployment_config = load_eks_or_adhoc_deployment_config( + service, instance, cluster, is_toolbox, user + ) + except: + # tron + tron = True + deployment_config = load_tron_config(service, instance, cluster) # Set override command, or sleep for interactive mode - if command and not is_toolbox: - deployment_config.config_dict["cmd"] = command - elif interactive and not is_toolbox: - deployment_config.config_dict["cmd"] = f"sleep {max_duration}" + if not tron: + if command and not is_toolbox: + deployment_config.config_dict["cmd"] = command + elif interactive and not is_toolbox: + deployment_config.config_dict["cmd"] = f"sleep {max_duration}" + else: + # Tron dicts use "command" instead of "cmd" and expects an array + deployment_config.config_dict["command"] = ["/usr/bin/sleep", str(max_duration)] # Create the app with a new name formatted_job = deployment_config.format_kubernetes_job( @@ -209,9 +238,12 @@ def remote_run_ready( kube_client = KubeClient() # Load the service deployment settings - deployment_config = load_eks_or_adhoc_deployment_config( - service, instance, cluster, is_toolbox, user - ) + try: + deployment_config = load_eks_or_adhoc_deployment_config( + service, instance, cluster, is_toolbox, user + ) + except: + deployment_config = load_tron_config(service, instance, cluster) namespace = deployment_config.get_namespace() pod = find_job_pod(kube_client, namespace, job_name) @@ -289,8 +321,13 @@ def remote_run_token( """ kube_client = KubeClient() - # Load the service deployment settings - deployment_config = load_eks_or_adhoc_deployment_config(service, instance, cluster) + try: + # Load the service deployment settings + deployment_config = load_eks_or_adhoc_deployment_config( + service, instance, cluster + ) + except: + deployment_config = load_tron_config(service, instance, cluster) namespace = deployment_config.get_namespace() # Rebuild the job metadata diff --git a/paasta_tools/tron_tools.py b/paasta_tools/tron_tools.py index f5c7758a30..02b87399de 100644 --- a/paasta_tools/tron_tools.py +++ b/paasta_tools/tron_tools.py @@ -23,6 +23,7 @@ from typing import cast from typing import List from typing import Mapping +from typing import Sequence from typing import Tuple from typing import Union @@ -70,6 +71,25 @@ limit_size_with_hash, raw_selectors_to_requirements, to_node_label, + V1EnvVar, + V1ResourceRequirements, + V1SecurityContext, + V1Capabilities, + V1Container, + V1EnvVarSource, + V1ObjectFieldSelector, + V1PodTemplateSpec, + V1Job, + V1JobSpec, + V1Affinity, + V1ObjectMeta, + V1PodSpec, + AwsEbsVolume, + paasta_prefixed, + JOB_TYPE_LABEL_NAME, + CAPS_DROP, + get_git_sha_from_dockerurl, + load_service_namespace_config, ) from paasta_tools.secret_tools import is_secret_ref from paasta_tools.secret_tools import is_shared_secret @@ -77,6 +97,7 @@ from paasta_tools.secret_tools import get_secret_name_from_ref from paasta_tools.kubernetes_tools import get_paasta_secret_name from paasta_tools.kubernetes_tools import add_volumes_for_authenticating_services +from paasta_tools.kubernetes_tools import sanitise_kubernetes_name from paasta_tools.secret_tools import SHARED_SECRET_SERVICE from paasta_tools import monitoring_tools @@ -724,6 +745,357 @@ def get_projected_sa_volumes(self) -> Optional[List[ProjectedSAVolume]]: ) return projected_volumes if projected_volumes else None + def get_kubernetes_service_account_name(self) -> Optional[str]: + return self.config_dict.get("service_account_name", None) + + def get_kubernetes_secret_env_vars( + self, + secret_env_vars: Mapping[str, str], + shared_secret_env_vars: Mapping[str, str], + ) -> Sequence[V1EnvVar]: + ret = [] + for k, v in secret_env_vars.items(): + secret = get_secret_name_from_ref(v) + ret.append( + V1EnvVar( + name=k, + value_from=V1EnvVarSource( + secret_key_ref=V1SecretKeySelector( + name=get_paasta_secret_name( + self.get_namespace(), self.get_service(), secret + ), + key=secret, + optional=False, + ) + ), + ) + ) + for k, v in shared_secret_env_vars.items(): + secret = get_secret_name_from_ref(v) + ret.append( + V1EnvVar( + name=k, + value_from=V1EnvVarSource( + secret_key_ref=V1SecretKeySelector( + name=get_paasta_secret_name( + self.get_namespace(), SHARED_SECRET_SERVICE, secret + ), + key=secret, + optional=False, + ) + ), + ) + ) + return ret + + def get_env_vars_that_use_secrets(self) -> Tuple[Dict[str, str], Dict[str, str]]: + """Returns two dictionaries of environment variable name->value; the first is vars that use non-shared + secrets, and the second is vars that use shared secrets. + + The values of the dictionaries are the secret refs as formatted in yelpsoa-configs, e.g. "SECRET(foo)" + or "SHARED_SECRET(bar)". These can be decoded with get_secret_name_from_ref. + """ + secret_env_vars = {} + shared_secret_env_vars = {} + for k, v in self.get_env().items(): + if is_secret_ref(v): + if is_shared_secret(v): + shared_secret_env_vars[k] = v + else: + secret_env_vars[k] = v + return secret_env_vars, shared_secret_env_vars + + def get_kubernetes_environment(self) -> List[V1EnvVar]: + kubernetes_env = [ + V1EnvVar( + name="PAASTA_POD_IP", + value_from=V1EnvVarSource( + field_ref=V1ObjectFieldSelector(field_path="status.podIP") + ), + ), + V1EnvVar( + # this is used by some functions of operator-sdk + # it uses this environment variable to get the pods + name="POD_NAME", + value_from=V1EnvVarSource( + field_ref=V1ObjectFieldSelector(field_path="metadata.name") + ), + ), + V1EnvVar( + name="PAASTA_HOST", + value_from=V1EnvVarSource( + field_ref=V1ObjectFieldSelector(field_path="spec.nodeName") + ), + ), + V1EnvVar( + name="PAASTA_CLUSTER", + value_from=V1EnvVarSource( + field_ref=V1ObjectFieldSelector( + field_path="metadata.labels['" + + paasta_prefixed("cluster") + + "']" + ) + ), + ), + ] + return kubernetes_env + + def get_container_env(self) -> Sequence[V1EnvVar]: + secret_env_vars, shared_secret_env_vars = self.get_env_vars_that_use_secrets() + + user_env = [ + V1EnvVar(name=name, value=value) + for name, value in self.get_env().items() + if name + not in list(secret_env_vars.keys()) + list(shared_secret_env_vars.keys()) + ] + user_env += self.get_kubernetes_secret_env_vars( + secret_env_vars=secret_env_vars, + shared_secret_env_vars=shared_secret_env_vars, + ) + return user_env + self.get_kubernetes_environment() # type: ignore + + def get_resource_requirements(self) -> V1ResourceRequirements: + limits = { + "cpu": self.get_cpus() + self.get_cpu_burst_add(), + "memory": f"{self.get_mem()}Mi", + "ephemeral-storage": f"{self.get_disk()}Mi", + } + requests = { + "cpu": self.get_cpus(), + "memory": f"{self.get_mem()}Mi", + "ephemeral-storage": f"{self.get_disk()}Mi", + } + return V1ResourceRequirements(limits=limits, requests=requests) + + def get_sanitised_instance_name(self) -> str: + return sanitise_kubernetes_name(self.get_action_name()) + + def get_security_context(self) -> Optional[V1SecurityContext]: + cap_add = self.config_dict.get("cap_add", None) + context_kwargs = ( + # passing parameter like this to avoid all services to bounce + # when this change is released + {"privileged": self.config_dict["privileged"]} + if "privileged" in self.config_dict + else {} + ) + if cap_add is None: + return V1SecurityContext( + capabilities=V1Capabilities(drop=CAPS_DROP), + **context_kwargs, + ) + else: + return V1SecurityContext( + # XXX: we should probably generally work in sets, but V1Capabilities is typed as accepting + # lists of string only + capabilities=V1Capabilities( + add=cap_add, + # NOTE: this is necessary as containerd differs in behavior from dockershim: in dockershim + # dropped capabilities were overriden if the same capability was added - but in containerd + # the dropped capabilities appear to have higher priority. + # WARNING: this must be sorted - otherwise the order of the capabilities will be different + # on every setup_kubernetes_job run and cause unnecessary redeployments + drop=sorted(list(set(CAPS_DROP) - set(cap_add))), + ), + **context_kwargs, + ) + + def get_kubernetes_container( + self, + docker_volumes: Sequence[DockerVolume], + system_paasta_config: SystemPaastaConfig, + aws_ebs_volumes: Sequence[AwsEbsVolume], + secret_volumes: Sequence[TronSecretVolume], + ) -> V1Container: + service_container = V1Container( + image=self.get_docker_url(), + command=self.get_cmd(), + args=self.get_args(), + env=self.get_container_env(), + resources=self.get_resource_requirements(), + name=self.get_sanitised_instance_name(), + security_context=self.get_security_context(), + volume_mounts=[], # TODO + # volume_mounts=self.get_volume_mounts( + # docker_volumes=docker_volumes, + # aws_ebs_volumes=aws_ebs_volumes, + # persistent_volumes=self.get_persistent_volumes(), + # secret_volumes=secret_volumes, + # projected_sa_volumes=self.get_projected_sa_volumes(), + # ), + ) + return service_container + + def get_pod_template_spec( + self, + git_sha: str, + system_paasta_config: SystemPaastaConfig, + restart_on_failure: bool = False, + include_sidecars: bool = False, + force_no_routable_ip: bool = False, + include_liveness_probe: bool = False, + include_readiness_probe: bool = False, + ) -> V1PodTemplateSpec: + service_namespace_config = load_service_namespace_config( + service=self.service, namespace=self.get_nerve_namespace() + ) + docker_volumes = self.get_volumes( + system_volumes=system_paasta_config.get_volumes(), + ) + + hacheck_sidecar_volumes = system_paasta_config.get_hacheck_sidecar_volumes() + annotations: KubePodAnnotations = { + # "smartstack_registrations": json.dumps(self.get_registrations()), + "paasta.yelp.com/routable_ip": "false" + } + + pod_spec_kwargs = {} + pod_spec_kwargs.update(system_paasta_config.get_pod_defaults()) + pod_spec_kwargs.update( + service_account_name=self.get_kubernetes_service_account_name(), + containers=[ + self.get_kubernetes_container( + docker_volumes=docker_volumes, + aws_ebs_volumes=self.get_aws_ebs_volumes(), + secret_volumes=self.get_secret_volumes(), + system_paasta_config=system_paasta_config, + ) + ], + share_process_namespace=True, + node_selector=self.get_node_selectors(), + restart_policy="Never", + volumes=[], # TODO + # volumes=self.get_pod_volumes( + # docker_volumes=docker_volumes + hacheck_sidecar_volumes, + # aws_ebs_volumes=self.get_aws_ebs_volumes(), + # secret_volumes=self.get_secret_volumes(), + # projected_sa_volumes=self.get_projected_sa_volumes(), + # ), + ) + # need to check if there are node selectors/affinities. if there are none + # and we create an empty affinity object, k8s will deselect all nodes. + node_affinity = self.get_node_affinities( + # system_paasta_config.get_pool_node_affinities() + ) # TODO + if node_affinity is not None: + node_affinity = node_affinity[0] + pod_spec_kwargs["affinity"] = V1Affinity(node_affinity=node_affinity) + + fs_group = None + if self.get_iam_role_provider() == "aws": + annotations["iam.amazonaws.com/role"] = "" + iam_role = self.get_iam_role() + if iam_role: + pod_spec_kwargs["service_account_name"] = get_service_account_name( + iam_role + ) + if fs_group is None: + # We need some reasoable default for group id of a process + # running inside the container. Seems like most of such + # programs run as `nobody`, let's use that as a default. + # + # PAASTA-16919: This should be removed when + # https://github.com/aws/amazon-eks-pod-identity-webhook/issues/8 + # is fixed. + fs_group = 65534 + else: + annotations["iam.amazonaws.com/role"] = self.get_iam_role() + + if fs_group is not None: + pod_spec_kwargs["security_context"] = V1PodSecurityContext( + fs_group=fs_group + ) + + # Default Pod labels + labels: KubePodLabels = { + "yelp.com/paasta_service": self.get_service(), + "yelp.com/paasta_instance": self.get_instance(), + "yelp.com/paasta_git_sha": git_sha, + # NOTE: we can't use the paasta_prefixed() helper here + # since mypy expects TypedDict keys to be string literals + "paasta.yelp.com/service": self.get_service(), + "paasta.yelp.com/instance": self.get_instance(), + "paasta.yelp.com/git_sha": git_sha, + "paasta.yelp.com/pool": self.get_pool(), + "paasta.yelp.com/cluster": self.cluster, + "yelp.com/owner": "compute_infra_platform_experience", + "paasta.yelp.com/managed": "true", + } + + image_version = self.get_image_version() + if image_version is not None: + labels["paasta.yelp.com/image_version"] = image_version + + return V1PodTemplateSpec( + metadata=V1ObjectMeta( + labels=labels, + annotations=annotations, + ), + spec=V1PodSpec(**pod_spec_kwargs), + ) + + def get_kubernetes_metadata(self, git_sha: str) -> V1ObjectMeta: + return V1ObjectMeta( + name=self.get_sanitised_instance_name(), + namespace=self.get_namespace(), + labels={ + "yelp.com/owner": "qlo", # TODO + "yelp.com/paasta_service": self.get_service(), + "yelp.com/paasta_instance": self.get_instance(), + "yelp.com/paasta_git_sha": git_sha, + paasta_prefixed("service"): self.get_service(), + paasta_prefixed("instance"): self.get_instance(), + paasta_prefixed("git_sha"): git_sha, + paasta_prefixed("cluster"): self.cluster, + paasta_prefixed("autoscaled"): "false", + paasta_prefixed("paasta.yelp.com/pool"): self.get_pool(), + paasta_prefixed("managed"): "true", + }, + ) + + def format_kubernetes_job( + self, + job_label: str, + deadline_seconds: int = 3600, + keep_routable_ip=False, + ) -> V1Job: + additional_labels = {paasta_prefixed(JOB_TYPE_LABEL_NAME): job_label} + try: + docker_url = self.get_docker_url() + git_sha = get_git_sha_from_dockerurl(docker_url, long=True) + system_paasta_config = load_system_paasta_config() + image_version = self.get_image_version() + if image_version is not None: + additional_labels[paasta_prefixed("image_version")] = image_version + pod_template = self.get_pod_template_spec( + git_sha=git_sha, + system_paasta_config=system_paasta_config, + restart_on_failure=False, + include_sidecars=False, + force_no_routable_ip=True, + include_liveness_probe=False, + include_readiness_probe=False, + ) + pod_template.metadata.labels.update(additional_labels) + complete_config = V1Job( + api_version="batch/v1", + kind="Job", + metadata=self.get_kubernetes_metadata(git_sha), + spec=V1JobSpec( + active_deadline_seconds=deadline_seconds, + ttl_seconds_after_finished=0, # remove job resource after completion + template=pod_template, + ), + ) + complete_config.metadata.labels.update(additional_labels) + except Exception as e: + raise InvalidKubernetesConfig(e, self.get_service(), self.get_instance()) + log.debug( + f"Complete configuration for job instance is: {complete_config}", + ) + return complete_config + class TronJobConfig: """Represents a job in Tron, consisting of action(s) and job-level configuration values.""" From f7d45439d79b02998e10c3147283e48dc1ead5af Mon Sep 17 00:00:00 2001 From: Quentin Long Date: Wed, 17 Sep 2025 18:42:54 -0700 Subject: [PATCH 2/2] More updates for supporting tron in remote-run --- paasta_tools/kubernetes/remote_run.py | 51 ++++--- paasta_tools/tron_tools.py | 204 +++++++++++++++++++++++--- 2 files changed, 214 insertions(+), 41 deletions(-) diff --git a/paasta_tools/kubernetes/remote_run.py b/paasta_tools/kubernetes/remote_run.py index f85f0f4037..ea60aeb862 100644 --- a/paasta_tools/kubernetes/remote_run.py +++ b/paasta_tools/kubernetes/remote_run.py @@ -43,7 +43,6 @@ from paasta_tools.kubernetes_tools import KubeClient from paasta_tools.kubernetes_tools import limit_size_with_hash from paasta_tools.kubernetes_tools import paasta_prefixed -from paasta_tools.tron_tools import format_tron_action_dict from paasta_tools.tron_tools import load_tron_instance_configs from paasta_tools.tron_tools import TronActionConfig from paasta_tools.utils import load_system_paasta_config @@ -158,7 +157,7 @@ def remote_run_start( deployment_config = load_eks_or_adhoc_deployment_config( service, instance, cluster, is_toolbox, user ) - except: + except NoConfigurationForServiceError: # tron tron = True deployment_config = load_tron_config(service, instance, cluster) @@ -174,11 +173,14 @@ def remote_run_start( deployment_config.config_dict["command"] = ["/usr/bin/sleep", str(max_duration)] # Create the app with a new name - formatted_job = deployment_config.format_kubernetes_job( - job_label=REMOTE_RUN_JOB_LABEL, - deadline_seconds=max_duration, - keep_routable_ip=is_toolbox, - ) + format_k8s_job_params = { + "job_label": REMOTE_RUN_JOB_LABEL, + "deadline_seconds": max_duration, + "keep_routable_ip": is_toolbox, + } + if tron: + format_k8s_job_params["username"] = user + formatted_job = deployment_config.format_kubernetes_job(**format_k8s_job_params) job_name = format_remote_run_job_name(formatted_job.metadata.name, user) formatted_job.metadata.name = job_name app_wrapper = get_application_wrapper(formatted_job) @@ -242,7 +244,7 @@ def remote_run_ready( deployment_config = load_eks_or_adhoc_deployment_config( service, instance, cluster, is_toolbox, user ) - except: + except NoConfigurationForServiceError: deployment_config = load_tron_config(service, instance, cluster) namespace = deployment_config.get_namespace() @@ -286,14 +288,22 @@ def remote_run_stop( kube_client = KubeClient() # Load the service deployment settings - deployment_config = load_eks_or_adhoc_deployment_config( - service, instance, cluster, is_toolbox, user - ) + tron = False + try: + deployment_config = load_eks_or_adhoc_deployment_config( + service, instance, cluster, is_toolbox, user + ) + except NoConfigurationForServiceError: + tron = True + deployment_config = load_tron_config(service, instance, cluster) # Rebuild the job metadata - formatted_job = deployment_config.format_kubernetes_job( - job_label=REMOTE_RUN_JOB_LABEL - ) + format_k8s_job_params = { + "job_label": REMOTE_RUN_JOB_LABEL, + } + if tron: + format_k8s_job_params["username"] = user + formatted_job = deployment_config.format_kubernetes_job(**format_k8s_job_params) job_name = format_remote_run_job_name(formatted_job.metadata.name, user) formatted_job.metadata.name = job_name @@ -321,19 +331,24 @@ def remote_run_token( """ kube_client = KubeClient() + tron = False try: # Load the service deployment settings deployment_config = load_eks_or_adhoc_deployment_config( service, instance, cluster ) - except: + except NoConfigurationForServiceError: deployment_config = load_tron_config(service, instance, cluster) + tron = True namespace = deployment_config.get_namespace() # Rebuild the job metadata - formatted_job = deployment_config.format_kubernetes_job( - job_label=REMOTE_RUN_JOB_LABEL - ) + format_k8s_job_params = { + "job_label": REMOTE_RUN_JOB_LABEL, + } + if tron: + format_k8s_job_params["username"] = user + formatted_job = deployment_config.format_kubernetes_job(**format_k8s_job_params) job_name = format_remote_run_job_name(formatted_job.metadata.name, user) # Find pod and create exec token for it diff --git a/paasta_tools/tron_tools.py b/paasta_tools/tron_tools.py index 02b87399de..b227c14736 100644 --- a/paasta_tools/tron_tools.py +++ b/paasta_tools/tron_tools.py @@ -13,6 +13,7 @@ import datetime import difflib import glob +import hashlib import json import logging import os @@ -64,6 +65,17 @@ from paasta_tools import spark_tools from paasta_tools.kubernetes_tools import ( + mode_to_int, + V1VolumeProjection, + PROJECTED_SA_TOKEN_PATH, + DEFAULT_PROJECTED_SA_EXPIRATION_SECONDS, + V1KeyToPath, + V1Volume, + V1ServiceAccountTokenProjection, + V1SecretVolumeSource, + V1ProjectedVolumeSource, + V1HostPathVolumeSource, + VolumeWithMode, NodeSelectorConfig, allowlist_denylist_to_requirements, contains_zone_label, @@ -84,12 +96,17 @@ V1Affinity, V1ObjectMeta, V1PodSpec, + V1SecretKeySelector, + V1VolumeMount, AwsEbsVolume, paasta_prefixed, JOB_TYPE_LABEL_NAME, CAPS_DROP, get_git_sha_from_dockerurl, - load_service_namespace_config, + KubePodAnnotations, + V1PodSecurityContext, + KubePodLabels, + InvalidKubernetesConfig, ) from paasta_tools.secret_tools import is_secret_ref from paasta_tools.secret_tools import is_shared_secret @@ -901,6 +918,154 @@ def get_security_context(self) -> Optional[V1SecurityContext]: **context_kwargs, ) + def get_sanitised_volume_name(self, volume_name: str, length_limit: int = 0) -> str: + """I know but we really aren't allowed many characters...""" + volume_name = volume_name.rstrip("/") + sanitised = volume_name.replace("/", "slash-").replace(".", "dot-") + sanitised_name = sanitise_kubernetes_name(sanitised) + if length_limit and len(sanitised_name) > length_limit: + sanitised_name = ( + sanitised_name[0 : length_limit - 6] + + "--" + + hashlib.md5(sanitised_name.encode("ascii")).hexdigest()[:4] + ) + return sanitised_name + + def get_docker_volume_name(self, docker_volume: DockerVolume) -> str: + return self.get_sanitised_volume_name( + "host--{name}".format(name=docker_volume["hostPath"]), length_limit=63 + ) + + def get_aws_ebs_volume_name(self, aws_ebs_volume: AwsEbsVolume) -> str: + return self.get_sanitised_volume_name( + "aws-ebs--{name}{partition}".format( + name=aws_ebs_volume["volume_id"], + partition=aws_ebs_volume.get("partition", ""), + ) + ) + + def get_projected_sa_volume_name( + self, projected_sa_volume: ProjectedSAVolume + ) -> str: + return self.get_sanitised_volume_name( + "projected-sa--{audience}".format(audience=projected_sa_volume["audience"]), + length_limit=63, + ) + + def read_only_mode(self, d: VolumeWithMode) -> bool: + return d.get("mode", "RO") == "RO" + + def get_pod_volumes( + self, + docker_volumes: Sequence[DockerVolume], + secret_volumes: Sequence[TronSecretVolume], + projected_sa_volumes: Sequence[ProjectedSAVolume], + ) -> Sequence[V1Volume]: + pod_volumes = [] + unique_docker_volumes = { + self.get_docker_volume_name(docker_volume): docker_volume + for docker_volume in docker_volumes + } + for name, docker_volume in unique_docker_volumes.items(): + pod_volumes.append( + V1Volume( + host_path=V1HostPathVolumeSource(path=docker_volume["hostPath"]), + name=name, + ) + ) + for secret_volume in secret_volumes: + if "items" in secret_volume: + items = [ + V1KeyToPath( + key=item["key"], + mode=mode_to_int(item.get("mode")), + path=item["path"], + ) + for item in secret_volume["items"] + ] + else: + items = None + pod_volumes.append( + V1Volume( + name=self.get_secret_volume_name(secret_volume), + secret=V1SecretVolumeSource( + secret_name=get_paasta_secret_name( + self.get_namespace(), + self.get_service(), + secret_volume["secret_name"], + ), + default_mode=mode_to_int(secret_volume.get("default_mode")), + items=items, + optional=False, + ), + ) + ) + for projected_volume in projected_sa_volumes or []: + pod_volumes.append( + V1Volume( + name=self.get_projected_sa_volume_name(projected_volume), + projected=V1ProjectedVolumeSource( + sources=[ + V1VolumeProjection( + service_account_token=V1ServiceAccountTokenProjection( + audience=projected_volume["audience"], + expiration_seconds=projected_volume.get( + "expiration_seconds", + DEFAULT_PROJECTED_SA_EXPIRATION_SECONDS, + ), + path=PROJECTED_SA_TOKEN_PATH, + ) + ) + ], + ), + ), + ) + + return pod_volumes + + def get_volume_mounts( + self, + docker_volumes: Sequence[DockerVolume], + aws_ebs_volumes: Sequence[AwsEbsVolume], + secret_volumes: Sequence[TronSecretVolume], + projected_sa_volumes: Sequence[ProjectedSAVolume], + ) -> Sequence[V1VolumeMount]: + volume_mounts = ( + [ + V1VolumeMount( + mount_path=docker_volume["containerPath"], + name=self.get_docker_volume_name(docker_volume), + read_only=self.read_only_mode(docker_volume), + ) + for docker_volume in docker_volumes + ] + + [ + V1VolumeMount( + mount_path=aws_ebs_volume["container_path"], + name=self.get_aws_ebs_volume_name(aws_ebs_volume), + read_only=self.read_only_mode(aws_ebs_volume), + ) + for aws_ebs_volume in aws_ebs_volumes + ] + + [ + V1VolumeMount( + mount_path=volume["container_path"], + name=self.get_secret_volume_name(volume), + read_only=True, + ) + for volume in secret_volumes + ] + + [ + V1VolumeMount( + mount_path=volume["container_path"], + name=self.get_projected_sa_volume_name(volume), + read_only=True, + ) + for volume in projected_sa_volumes or [] + ] + ) + return volume_mounts + def get_kubernetes_container( self, docker_volumes: Sequence[DockerVolume], @@ -916,14 +1081,12 @@ def get_kubernetes_container( resources=self.get_resource_requirements(), name=self.get_sanitised_instance_name(), security_context=self.get_security_context(), - volume_mounts=[], # TODO - # volume_mounts=self.get_volume_mounts( - # docker_volumes=docker_volumes, - # aws_ebs_volumes=aws_ebs_volumes, - # persistent_volumes=self.get_persistent_volumes(), - # secret_volumes=secret_volumes, - # projected_sa_volumes=self.get_projected_sa_volumes(), - # ), + volume_mounts=self.get_volume_mounts( + docker_volumes=docker_volumes, + aws_ebs_volumes=aws_ebs_volumes, + secret_volumes=secret_volumes, + projected_sa_volumes=self.get_projected_sa_volumes(), + ), ) return service_container @@ -937,14 +1100,10 @@ def get_pod_template_spec( include_liveness_probe: bool = False, include_readiness_probe: bool = False, ) -> V1PodTemplateSpec: - service_namespace_config = load_service_namespace_config( - service=self.service, namespace=self.get_nerve_namespace() - ) docker_volumes = self.get_volumes( system_volumes=system_paasta_config.get_volumes(), ) - hacheck_sidecar_volumes = system_paasta_config.get_hacheck_sidecar_volumes() annotations: KubePodAnnotations = { # "smartstack_registrations": json.dumps(self.get_registrations()), "paasta.yelp.com/routable_ip": "false" @@ -965,13 +1124,11 @@ def get_pod_template_spec( share_process_namespace=True, node_selector=self.get_node_selectors(), restart_policy="Never", - volumes=[], # TODO - # volumes=self.get_pod_volumes( - # docker_volumes=docker_volumes + hacheck_sidecar_volumes, - # aws_ebs_volumes=self.get_aws_ebs_volumes(), - # secret_volumes=self.get_secret_volumes(), - # projected_sa_volumes=self.get_projected_sa_volumes(), - # ), + volumes=self.get_pod_volumes( + docker_volumes=docker_volumes, + secret_volumes=self.get_secret_volumes(), + projected_sa_volumes=self.get_projected_sa_volumes(), + ), ) # need to check if there are node selectors/affinities. if there are none # and we create an empty affinity object, k8s will deselect all nodes. @@ -1035,12 +1192,12 @@ def get_pod_template_spec( spec=V1PodSpec(**pod_spec_kwargs), ) - def get_kubernetes_metadata(self, git_sha: str) -> V1ObjectMeta: + def get_kubernetes_metadata(self, git_sha: str, username: str) -> V1ObjectMeta: return V1ObjectMeta( name=self.get_sanitised_instance_name(), namespace=self.get_namespace(), labels={ - "yelp.com/owner": "qlo", # TODO + "yelp.com/owner": username, "yelp.com/paasta_service": self.get_service(), "yelp.com/paasta_instance": self.get_instance(), "yelp.com/paasta_git_sha": git_sha, @@ -1057,6 +1214,7 @@ def get_kubernetes_metadata(self, git_sha: str) -> V1ObjectMeta: def format_kubernetes_job( self, job_label: str, + username: str, deadline_seconds: int = 3600, keep_routable_ip=False, ) -> V1Job: @@ -1081,7 +1239,7 @@ def format_kubernetes_job( complete_config = V1Job( api_version="batch/v1", kind="Job", - metadata=self.get_kubernetes_metadata(git_sha), + metadata=self.get_kubernetes_metadata(git_sha, username), spec=V1JobSpec( active_deadline_seconds=deadline_seconds, ttl_seconds_after_finished=0, # remove job resource after completion