Skip to content

Commit

Permalink
Flow configured envs from client to kernel pod (#1164)
Browse files Browse the repository at this point in the history
* Ensure envs properly flow to kernel pods on k8s
  • Loading branch information
kevin-bates authored Oct 14, 2022
1 parent 15c6047 commit 4f4e6de
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 35 deletions.
17 changes: 10 additions & 7 deletions enterprise_gateway/services/processproxies/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,18 @@ async def launch_process(
self, kernel_cmd: str, **kwargs: dict[str, Any] | None
) -> "KubernetesProcessProxy":
"""Launches the specified process within a Kubernetes environment."""
# Set env before superclass call so we see these in the debug output
# Set env before superclass call, so we can see these in the debug output

# Kubernetes relies on many internal env variables. Since EG is running in a k8s pod, we will
# transfer its env to each launched kernel.
kwargs["env"] = dict(os.environ, **kwargs["env"])
# Kubernetes relies on internal env variables to determine its configuration. When
# running within a K8s cluster, these start with KUBERNETES_SERVICE, otherwise look
# for envs prefixed with KUBECONFIG.
for key in os.environ:
if key.startswith("KUBECONFIG") or key.startswith("KUBERNETES_SERVICE"):
kwargs["env"][key] = os.environ[key]

# Determine pod name and namespace - creating the latter if necessary
self.kernel_pod_name = self._determine_kernel_pod_name(**kwargs)
self.kernel_namespace = self._determine_kernel_namespace(
**kwargs
) # will create namespace if not provided
self.kernel_namespace = self._determine_kernel_namespace(**kwargs)

await super().launch_process(kernel_cmd, **kwargs)
return self
Expand Down
25 changes: 5 additions & 20 deletions etc/kernel-launchers/kubernetes/scripts/kernel-pod.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,12 @@ spec:
fsGroup: 100
{% endif %}
containers:
- env:
- name: PORT_RANGE
value: "{{ port_range }}"
- name: RESPONSE_ADDRESS
value: "{{ response_address }}"
- name: PUBLIC_KEY
value: "{{ public_key }}"
- name: KERNEL_ID
value: "{{ kernel_id }}"
- name: KERNEL_LANGUAGE
value: "{{ kernel_language }}"
- name: KERNEL_NAME
value: "{{ kernel_name }}"
- name: KERNEL_NAMESPACE
value: "{{ kernel_namespace }}"
- name: KERNEL_SPARK_CONTEXT_INIT_MODE
value: "{{ kernel_spark_context_init_mode }}"
- name: KERNEL_USERNAME
value: "{{ kernel_username }}"
image: "{{ kernel_image }}"
- image: "{{ kernel_image }}"
name: "{{ kernel_pod_name }}"
env:
# Add any custom envs here that aren't already configured for the kernel's environment
# - name: MY_CUSTOM_ENV
# value: "my_custom_value"
{% if kernel_cpus is defined or kernel_memory is defined or kernel_gpus is defined or kernel_cpus_limit is defined or kernel_memory_limit is defined or kernel_gpus_limit is defined %}
resources:
{% if kernel_cpus is defined or kernel_memory is defined or kernel_gpus is defined %}
Expand Down
45 changes: 37 additions & 8 deletions etc/kernel-launchers/kubernetes/scripts/launch_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import argparse
import os
import sys
from typing import Dict
from typing import Dict, List

import urllib3
import yaml
Expand Down Expand Up @@ -41,6 +41,28 @@ def generate_kernel_pod_yaml(keywords):
return k8s_yaml


def extend_pod_env(pod_def: dict) -> dict:
"""Extends the pod_def.spec.containers[0].env stanza with current environment."""
env_stanza = pod_def["spec"]["containers"][0].get("env") or []

# Walk current set of template env entries and replace those found in the current
# env with their values (and record those items). Then add all others from the env
# that were not already.
processed_entries: List[str] = []
for item in env_stanza:
item_name = item.get("name")
if item_name in os.environ:
item["value"] = os.environ[item_name]
processed_entries.append(item_name)

for name, value in os.environ.items():
if name not in processed_entries:
env_stanza.append({"name": name, "value": value})

pod_def["spec"]["containers"][0]["env"] = env_stanza
return pod_def


def launch_kubernetes_kernel(
kernel_id,
port_range,
Expand All @@ -63,14 +85,21 @@ def launch_kubernetes_kernel(
# Factory values...
# Since jupyter lower cases the kernel directory as the kernel-name, we need to capture its case-sensitive
# value since this is used to locate the kernel launch script within the image.
keywords["port_range"] = port_range
keywords["public_key"] = public_key
keywords["response_address"] = response_addr
keywords["kernel_id"] = kernel_id
keywords["kernel_name"] = os.path.basename(
# Ensure these key/value pairs are reflected in the environment. We'll add these to the container's env
# stanza after the pod template is generated.
if port_range:
os.environ["PORT_RANGE"] = port_range
if public_key:
os.environ["PUBLIC_KEY"] = public_key
if response_addr:
os.environ["RESPONSE_ADDRESS"] = response_addr
if kernel_id:
os.environ["KERNEL_ID"] = kernel_id
if spark_context_init_mode:
os.environ["KERNEL_SPARK_CONTEXT_INIT_MODE"] = spark_context_init_mode
os.environ["KERNEL_NAME"] = os.path.basename(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
keywords["kernel_spark_context_init_mode"] = spark_context_init_mode

# Walk env variables looking for names prefixed with KERNEL_. When found, set corresponding keyword value
# with name in lower case.
Expand All @@ -95,7 +124,7 @@ def launch_kubernetes_kernel(
if k8s_obj.get("kind"):
if k8s_obj["kind"] == "Pod":
# print("{}".format(k8s_obj)) # useful for debug
pod_template = k8s_obj
pod_template = extend_pod_env(k8s_obj)
if pod_template_file is None:
client.CoreV1Api(client.ApiClient()).create_namespaced_pod(
body=k8s_obj, namespace=kernel_namespace
Expand Down

0 comments on commit 4f4e6de

Please sign in to comment.