Skip to content
Open
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
75b1ffd
initial changes
hmtosi Mar 16, 2026
dea5ae8
update to include all Client parameters
hmtosi Mar 17, 2026
98b09be
remove UI components
hmtosi Mar 17, 2026
3181b0b
remove ui components
hmtosi Mar 17, 2026
0a7b6ce
remove test until functionality is confirmed
hmtosi Mar 17, 2026
64f7682
Merge branch 'main' into issue-680-configure-server
hmtosi Mar 17, 2026
2f35252
Merge branch 'main' into issue-680-configure-server
hmtosi Mar 18, 2026
ce8b94e
finish removing ui functionality
hmtosi Mar 18, 2026
2a5525e
add unit test for kfp server config
hmtosi Mar 18, 2026
a99999d
update paramter descriptions
hmtosi Mar 18, 2026
a406305
Merge branch 'main' into issue-680-configure-server
hmtosi Mar 20, 2026
9b77459
fix linting
hmtosi Mar 20, 2026
0ea0c75
change to factory architecture
hmtosi Mar 20, 2026
469368b
add factory to git tracking
hmtosi Mar 20, 2026
3633771
update documentation
hmtosi Mar 20, 2026
4a1370b
update so e2e passes
hmtosi Mar 20, 2026
808c32f
Merge branch 'main' into issue-680-configure-server
hmtosi Mar 20, 2026
92b7567
Merge branch 'kubeflow:main' into issue-680-configure-server
hmtosi Mar 24, 2026
136e2dd
update default config path
hmtosi Mar 24, 2026
0500397
update kfp authentication to allow credentials
hmtosi Mar 24, 2026
57e7e12
add authenticator file
hmtosi Mar 24, 2026
33cd1e3
Merge branch 'kubeflow:main' into issue-680-configure-server
hmtosi Mar 25, 2026
5d3ac89
Fix circular import
jesuino Mar 25, 2026
351638b
Merge pull request #2 from jesuino/credentials_circular_import
hmtosi Mar 25, 2026
f41d494
Apply suggestion from @Copilot
hmtosi Mar 25, 2026
192cded
add config env var so it can be explicitly set
hmtosi Mar 25, 2026
1a80fe5
add atomic rename suggested by copilot
hmtosi Mar 26, 2026
59703ae
update documentation
hmtosi Mar 26, 2026
3ba3ece
Merge branch 'kubeflow:main' into issue-680-configure-server
hmtosi Apr 1, 2026
481266c
separate configuration (persisted) from credentials (resolved at runt…
hmtosi Apr 1, 2026
bec5c32
prevent tokens from being saved to disk
hmtosi Apr 2, 2026
bd43a9b
Merge branch 'kubeflow:main' into issue-680-configure-server
hmtosi Apr 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 195 additions & 0 deletions kale/common/kfp_authenticator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
# Copyright 2026 The Kubeflow Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""KFP authentication module for creating credentials at runtime."""

import logging
import os
from typing import Any

log = logging.getLogger(__name__)


class AuthResult:
"""Result from authentication containing credentials for kfp.Client.

This class holds the various authentication artifacts that can be passed
to kfp.Client(). Only one authentication method should be set at a time.

Attributes:
credentials: ServiceAccountTokenVolumeCredentials object for K8s service account auth
cookies: Cookie string for DEX-based authentication
existing_token: Bearer token string for token-based authentication
"""

def __init__(
self,
credentials: Any | None = None,
cookies: str | None = None,
existing_token: str | None = None,
):
self.credentials = credentials
self.cookies = cookies
self.existing_token = existing_token


class K8sServiceAccountTokenAuthenticator:
"""Authenticator for Kubernetes service account token-based authentication.

Creates a ServiceAccountTokenVolumeCredentials object that reads the
service account token from a file path (typically mounted by Kubernetes).
"""

def authenticate(self, params: dict[str, Any] | None = None) -> AuthResult:
"""Create credentials from Kubernetes service account token.

Args:
params: Optional dictionary containing:
- token_path: Path to service account token file.
Defaults to KF_PIPELINES_SA_TOKEN_PATH env var or standard location.

Returns:
AuthResult with ServiceAccountTokenVolumeCredentials

Raises:
FileNotFoundError: If token file doesn't exist
ValueError: If token file is empty
"""
from kfp.client import KF_PIPELINES_SA_TOKEN_PATH, ServiceAccountTokenVolumeCredentials

params = params or {}
token_path = params.get(
"token_path",
os.getenv("KF_PIPELINES_SA_TOKEN_PATH", KF_PIPELINES_SA_TOKEN_PATH),
)

# Validate token file exists and is non-empty
if not os.path.exists(token_path):
raise FileNotFoundError(
f"Service account token file not found at {token_path}. "
"Ensure you're running in a Kubernetes pod with a service account token mounted."
)

with open(token_path) as f:
token_content = f.read().strip()
if not token_content:
raise ValueError(f"Service account token file at {token_path} is empty")

log.info("Using Kubernetes service account token from %s", token_path)
credentials = ServiceAccountTokenVolumeCredentials(path=token_path)
return AuthResult(credentials=credentials)


class ExistingBearerTokenAuthenticator:
"""Authenticator for pre-existing bearer token authentication."""

def authenticate(self, params: dict[str, Any] | None = None) -> AuthResult:
"""Create credentials from an existing bearer token.

Args:
params: Dictionary containing:
- token: Bearer token string (required)

Returns:
AuthResult with bearer token

Raises:
ValueError: If token is missing or empty
"""
params = params or {}
token = params.get("token")

if not token:
raise ValueError("Bearer token is required but not provided in auth_params['token']")

log.info("Using existing bearer token for authentication")
return AuthResult(existing_token=token)


class DexAuthenticator:
"""Authenticator for DEX-based authentication using cookies."""

def authenticate(self, params: dict[str, Any] | None = None) -> AuthResult:
"""Create credentials from DEX session cookies.

Args:
params: Dictionary containing:
- cookies: Cookie string (required)

Returns:
AuthResult with cookies

Raises:
ValueError: If cookies are missing or empty
"""
params = params or {}
cookies = params.get("cookies")

if not cookies:
raise ValueError("Cookies are required but not provided in auth_params['cookies']")

log.info("Using DEX cookie-based authentication")
return AuthResult(cookies=cookies)


class NoAuthAuthenticator:
"""Authenticator for unsecured KFP endpoints (no authentication required)."""

def authenticate(self, params: dict[str, Any] | None = None) -> AuthResult:
"""Return empty credentials for unsecured endpoints.

Args:
params: Ignored

Returns:
AuthResult with no credentials set
"""
log.info("Using no authentication (unsecured endpoint)")
return AuthResult()


def get_authenticator(
auth_type: str,
) -> (
K8sServiceAccountTokenAuthenticator
| ExistingBearerTokenAuthenticator
| DexAuthenticator
| NoAuthAuthenticator
):
"""Factory function to get the appropriate authenticator for an auth type.

Args:
auth_type: Authentication type. Supported values:
- "kubernetes_service_account_token": K8s service account token
- "existing_bearer_token": Pre-existing bearer token
- "dex": DEX cookie-based authentication
- "none": No authentication

Returns:
Authenticator instance for the specified type.
Defaults to NoAuthAuthenticator for unknown types.
"""
authenticators = {
"kubernetes_service_account_token": K8sServiceAccountTokenAuthenticator(),
"existing_bearer_token": ExistingBearerTokenAuthenticator(),
"dex": DexAuthenticator(),
"none": NoAuthAuthenticator(),
}

authenticator = authenticators.get(auth_type)
if authenticator is None:
log.warning("Unknown auth_type '%s', defaulting to no authentication", auth_type)
return NoAuthAuthenticator()

return authenticator
78 changes: 78 additions & 0 deletions kale/common/kfp_client_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Copyright 2026 The Kubeflow Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Factory for creating KFP client instances with configuration support."""

from typing import TYPE_CHECKING

import kfp

from kale.common import kfp_authenticator
from kale.config import kfp_server_config

if TYPE_CHECKING:
from kfp import Client


def get_kfp_client(
host: str | None = None,
auth_type: str | None = None,
auth_params: dict | None = None,
namespace: str | None = None,
ssl_ca_cert: str | None = None,
) -> "Client":
"""Create a KFP client with configuration.

Loads saved configuration from ~/.config/kale/kfp_server_config.json and allows
parameter overrides. Explicit parameters override saved config if they are provided.

Authentication is handled by creating credentials at runtime using the authenticator
module, so credential objects are never serialized to disk.

Args:
host: KFP API server host
auth_type: Authentication type. Supported values:
- "kubernetes_service_account_token": K8s service account token
- "existing_bearer_token": Pre-existing bearer token
- "dex": DEX cookie-based authentication
- "none": No authentication (default)
auth_params: Parameters for the authentication type (e.g., {"token": "..."})
namespace: Kubernetes namespace
ssl_ca_cert: Path to CA certificate file

Returns:
kfp.Client instance configured with provided parameters or saved config
"""
# Load saved configuration
config = kfp_server_config.load_config()

# Use parameter if provided, otherwise fall back to config
host = host or config.host
auth_type = auth_type or config.auth_type or "none"
auth_params = auth_params or config.auth_params or {}
namespace = namespace or config.namespace or "kubeflow"
ssl_ca_cert = ssl_ca_cert or config.ssl_ca_cert

# Create credentials at runtime using authenticator
authenticator = kfp_authenticator.get_authenticator(auth_type)
auth_result = authenticator.authenticate(auth_params)

return kfp.Client(
host=host,
credentials=auth_result.credentials,
cookies=auth_result.cookies,
existing_token=auth_result.existing_token,
namespace=namespace,
ssl_ca_cert=ssl_ca_cert,
)
18 changes: 7 additions & 11 deletions kale/common/kfputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import kfp
from kfp_server_api.exceptions import ApiException

from kale.common import utils
from kale.common import kfp_client_factory, utils

KFP_RUN_ID_LABEL_KEY = "pipeline/runid"
KFP_RUN_NAME_ANNOTATION_KEY = "pipelines.kubeflow.org/run_name"
Expand All @@ -40,10 +40,6 @@
log = logging.getLogger(__name__)


def _get_kfp_client(host=None, namespace: str = "kubeflow"):
return kfp.Client(host=host, namespace=namespace)


def get_pipeline_id(pipeline_name: str, host: str = None) -> str:
"""List through the existing pipelines and filter by pipeline name.

Expand All @@ -54,7 +50,7 @@ def get_pipeline_id(pipeline_name: str, host: str = None) -> str:
Returns:
The matching pipeline id. None if not found
"""
client = _get_kfp_client(host)
client = kfp_client_factory.get_kfp_client(host)
token = ""
pipeline_id = None
while pipeline_id is None and token is not None:
Expand All @@ -80,7 +76,7 @@ def get_pipeline_version_id(version_name: str, pipeline_id: str, host: str = Non
Returns:
The matching pipeline id. None if not found
"""
client = _get_kfp_client(host)
client = kfp_client_factory.get_kfp_client(host)
page_token = ""
version_id = None
while version_id is None and page_token is not None:
Expand Down Expand Up @@ -129,7 +125,7 @@ def upload_pipeline(
host: custom host when executing outside of the cluster
Returns: (pipeline_id, version_id)
"""
client = _get_kfp_client(host)
client = kfp_client_factory.get_kfp_client(host)
log.info("Uploading pipeline '%s'...", pipeline_name)
pipeline_id = get_pipeline_id(pipeline_name, host=host)
if not pipeline_id:
Expand Down Expand Up @@ -176,7 +172,7 @@ def run_pipeline(
Returns:
Pipeline run metadata
"""
client = _get_kfp_client(host)
client = kfp_client_factory.get_kfp_client(host)
log.info("Creating KFP experiment '%s'...", experiment_name)
client.create_experiment(experiment_name)
pipeline = client.get_pipeline(pipeline_id)
Expand Down Expand Up @@ -279,7 +275,7 @@ def get_experiment_from_run_id(run_id: str):
Returns: ApiExperiment - the KFP Experiment which owns the run
"""
log.info("Getting experiment from run with ID '%s'...", run_id)
client = _get_kfp_client()
client = kfp_client_factory.get_kfp_client()
run = client.runs.get_run(run_id=run_id).run
experiment_id = None
type_experiment = client.api_models.ApiResourceType.EXPERIMENT
Expand All @@ -295,7 +291,7 @@ def get_experiment_from_run_id(run_id: str):

def get_run(run_id: str, host: str = None):
"""Retrieve KFP run based on RunID."""
client = _get_kfp_client(host)
client = kfp_client_factory.get_kfp_client(host)
return client.get_run(run_id)


Expand Down
Loading
Loading