Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Example: platform-wide MLflow tracing credentials for the operator to copy into session namespaces.
# Apply to the operator namespace (same pattern as ambient-admin-langfuse-secret).
#
# kubectl create secret generic ambient-admin-mlflow-observability-secret -n <operator-ns> \
# --from-literal=MLFLOW_TRACING_ENABLED=true \
# --from-literal=MLFLOW_TRACKING_URI=https://mlflow.example.com \
# --from-literal=MLFLOW_TRACKING_AUTH=kubernetes-namespaced \
# --from-literal=MLFLOW_EXPERIMENT_NAME=ambient-code-sessions \
# --from-literal=MLFLOW_TRACKING_AUTH=kubernetes-namespaced \
# --from-literal=OBSERVABILITY_BACKENDS=langfuse,mlflow
#
# Keys:
# MLFLOW_TRACING_ENABLED - "true" to enable secret copy + runner env injection
# MLFLOW_TRACKING_URI - required on the runner for MLflow tracing
# MLFLOW_TRACKING_AUTH - auth method; use "kubernetes-namespaced" for MLflow 3.11+ on K8s
# MLFLOW_EXPERIMENT_NAME - optional (runner default: ambient-code-sessions)
# MLFLOW_TRACKING_AUTH - optional; use "kubernetes-namespaced" so MLflow 3.11+ sends Authorization
# (service account JWT) and X-MLFLOW-WORKSPACE (pod namespace, unless overridden)
# MLFLOW_WORKSPACE - optional; fixed workspace id for X-MLFLOW-WORKSPACE instead of pod namespace
# OBSERVABILITY_BACKENDS - optional; comma list: langfuse, mlflow (runner default: langfuse only if unset)
apiVersion: v1
kind: Secret
metadata:
name: ambient-admin-mlflow-observability-secret
namespace: CHANGE_ME_OPERATOR_NAMESPACE
type: Opaque
stringData:
MLFLOW_TRACING_ENABLED: "true"
MLFLOW_TRACKING_URI: "https://mlflow.example.com"
MLFLOW_TRACKING_AUTH: "kubernetes-namespaced"
MLFLOW_EXPERIMENT_NAME: "ambient-code-sessions"
# MLflow client 3.11+; requires ambient-runner[mlflow-observability] (mlflow[kubernetes]).
MLFLOW_TRACKING_AUTH: "kubernetes-namespaced"
OBSERVABILITY_BACKENDS: "langfuse,mlflow"
13 changes: 13 additions & 0 deletions components/manifests/base/core/operator-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,19 @@ spec:
name: ambient-admin-langfuse-secret
key: LANGFUSE_SECRET_KEY
optional: true # Optional: only needed if Langfuse enabled
# MLflow tracing (runner); optional secret managed by platform admin
- name: MLFLOW_TRACING_ENABLED
valueFrom:
secretKeyRef:
name: ambient-admin-mlflow-observability-secret
key: MLFLOW_TRACING_ENABLED
optional: true
- name: OBSERVABILITY_BACKENDS
valueFrom:
secretKeyRef:
name: ambient-admin-mlflow-observability-secret
key: OBSERVABILITY_BACKENDS
optional: true
# Google OAuth client credentials for workspace-mcp
- name: GOOGLE_OAUTH_CLIENT_ID
valueFrom:
Expand Down
5 changes: 3 additions & 2 deletions components/operator/internal/handlers/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ func TransitionToStopped(ctx context.Context, session *unstructured.Unstructured
// Cleanup secrets
deleteCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
_ = deleteAmbientVertexSecret(deleteCtx, namespace)
_ = deleteAmbientLangfuseSecret(deleteCtx, namespace)
_ = deleteAmbientVertexSecret(deleteCtx, namespace, name)
_ = deleteAmbientLangfuseSecret(deleteCtx, namespace, name)
_ = deleteAmbientMlflowObservabilitySecret(deleteCtx, namespace, name)

return nil
}
Expand Down
217 changes: 207 additions & 10 deletions components/operator/internal/handlers/sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,19 +299,23 @@ func handleAgenticSessionEvent(obj *unstructured.Unstructured) error {
// Also cleanup ambient-vertex secret when session is stopped
deleteCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := deleteAmbientVertexSecret(deleteCtx, sessionNamespace); err != nil {
if err := deleteAmbientVertexSecret(deleteCtx, sessionNamespace, name); err != nil {
log.Printf("Warning: Failed to cleanup %s secret from %s: %v", types.AmbientVertexSecretName, sessionNamespace, err)
// Continue - session cleanup is still successful
}

// Cleanup Langfuse secret when session is stopped
// This only deletes secrets copied by the operator (with CopiedFromAnnotation).
// The platform-wide ambient-admin-langfuse-secret in the operator namespace is never deleted.
if err := deleteAmbientLangfuseSecret(deleteCtx, sessionNamespace); err != nil {
if err := deleteAmbientLangfuseSecret(deleteCtx, sessionNamespace, name); err != nil {
log.Printf("Warning: Failed to cleanup ambient-admin-langfuse-secret from %s: %v", sessionNamespace, err)
// Continue - session cleanup is still successful
}

if err := deleteAmbientMlflowObservabilitySecret(deleteCtx, sessionNamespace, name); err != nil {
log.Printf("Warning: Failed to cleanup ambient-admin-mlflow-observability-secret from %s: %v", sessionNamespace, err)
}

return nil
}

Expand Down Expand Up @@ -576,6 +580,30 @@ func handleAgenticSessionEvent(obj *unstructured.Unstructured) error {
log.Printf("Langfuse disabled, skipping secret copy")
}

ambientMlflowObsSecretCopied := false
mlflowTracingEnabled := os.Getenv("MLFLOW_TRACING_ENABLED") != "" && os.Getenv("MLFLOW_TRACING_ENABLED") != "0" && os.Getenv("MLFLOW_TRACING_ENABLED") != "false"

if mlflowTracingEnabled {
const mlflowObsSecretName = "ambient-admin-mlflow-observability-secret"
if mlflowSecret, err := config.K8sClient.CoreV1().Secrets(operatorNamespace).Get(context.TODO(), mlflowObsSecretName, v1.GetOptions{}); err == nil {
log.Printf("Found %s in %s, copying to %s", mlflowObsSecretName, operatorNamespace, sessionNamespace)
copyCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := copySecretToNamespace(copyCtx, mlflowSecret, sessionNamespace, currentObj); err != nil {
log.Printf("Warning: Failed to copy MLflow observability secret: %v. MLflow tracing will be disabled for this session.", err)
} else {
ambientMlflowObsSecretCopied = true
log.Printf("Successfully copied %s to %s", mlflowObsSecretName, sessionNamespace)
}
} else if !errors.IsNotFound(err) {
log.Printf("Warning: Failed to check for %s in %s: %v. MLflow tracing may be disabled for this session.", mlflowObsSecretName, operatorNamespace, err)
} else {
log.Printf("Warning: MLFLOW_TRACING_ENABLED is set but %s not found in namespace %s. MLflow tracing will be disabled for this session.", mlflowObsSecretName, operatorNamespace)
}
} else {
log.Printf("MLflow tracing disabled, skipping MLflow observability secret copy")
}

// Create a Kubernetes Pod for this AgenticSession
podName := fmt.Sprintf("%s-runner", name)

Expand Down Expand Up @@ -856,11 +884,19 @@ func handleAgenticSessionEvent(obj *unstructured.Unstructured) error {
}

// Create the Pod directly (no Job wrapper for faster startup)
runnerSATokenAutomount := false
var runnerPodSAName string
if ambientMlflowObsSecretCopied {
// MLflow MLFLOW_TRACKING_AUTH=kubernetes-namespaced reads token + namespace from
// /var/run/secrets/kubernetes.io/serviceaccount/ (requires automount + session SA).
runnerSATokenAutomount = true
runnerPodSAName = fmt.Sprintf("ambient-session-%s", name)
}
podSpec := corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
TerminationGracePeriodSeconds: &terminationGrace,
// Explicitly set service account for pod creation permissions
AutomountServiceAccountToken: boolPtr(false),
ServiceAccountName: runnerPodSAName,
AutomountServiceAccountToken: boolPtr(runnerSATokenAutomount),
}

// Workspace volume: only if persistence != persistenceNone OR repos are seeded
Expand Down Expand Up @@ -1097,6 +1133,83 @@ func handleAgenticSessionEvent(obj *unstructured.Unstructured) error {
log.Printf("Langfuse env vars configured via secretKeyRef for session %s", name)
}

const mlflowObsSecretName = "ambient-admin-mlflow-observability-secret"
if ambientMlflowObsSecretCopied {
base = append(base,
corev1.EnvVar{
Name: "MLFLOW_TRACING_ENABLED",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{Name: mlflowObsSecretName},
Key: "MLFLOW_TRACING_ENABLED",
Optional: boolPtr(true),
},
},
},
corev1.EnvVar{
Name: "MLFLOW_TRACKING_URI",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{Name: mlflowObsSecretName},
Key: "MLFLOW_TRACKING_URI",
Optional: boolPtr(true),
},
},
},
corev1.EnvVar{
Name: "MLFLOW_TRACKING_AUTH",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{Name: mlflowObsSecretName},
Key: "MLFLOW_TRACKING_AUTH",
Optional: boolPtr(true),
},
},
},
corev1.EnvVar{
Name: "MLFLOW_EXPERIMENT_NAME",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{Name: mlflowObsSecretName},
Key: "MLFLOW_EXPERIMENT_NAME",
Optional: boolPtr(true),
},
},
},
corev1.EnvVar{
Name: "MLFLOW_TRACKING_AUTH",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{Name: mlflowObsSecretName},
Key: "MLFLOW_TRACKING_AUTH",
Optional: boolPtr(true),
},
},
},
corev1.EnvVar{
Name: "MLFLOW_WORKSPACE",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{Name: mlflowObsSecretName},
Key: "MLFLOW_WORKSPACE",
Optional: boolPtr(true),
},
},
},
corev1.EnvVar{
Name: "OBSERVABILITY_BACKENDS",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{Name: mlflowObsSecretName},
Key: "OBSERVABILITY_BACKENDS",
Optional: boolPtr(true),
},
},
},
)
log.Printf("MLflow observability env vars configured via secretKeyRef for session %s", name)
}

// Add Vertex AI configuration only if enabled
if vertexEnabled {
base = append(base,
Expand Down Expand Up @@ -1474,15 +1587,23 @@ func appendNonConflictingEnvVars(base []corev1.EnvVar, extra []corev1.EnvVar) []
// replaceOrAppendEnvVars merges extra into base: replaces existing entries by name,
// or appends if the name does not exist. Used for the runner container where
// user-provided overrides are intentional.
// Entries in base that use ValueFrom (e.g. SecretKeyRef from the operator) are never
// replaced or cleared so spec.environmentVariables cannot override platform-injected secrets.
func replaceOrAppendEnvVars(base []corev1.EnvVar, extra []corev1.EnvVar) []corev1.EnvVar {
for _, ev := range extra {
replaced := false
for i := range base {
if base[i].Name == ev.Name {
base[i].Value = ev.Value
if base[i].Name != ev.Name {
continue
}
if base[i].ValueFrom != nil {
replaced = true
break
}
base[i].Value = ev.Value
base[i].ValueFrom = ev.ValueFrom
replaced = true
break
}
if !replaced {
base = append(base, ev)
Expand Down Expand Up @@ -1916,19 +2037,23 @@ func deletePodAndPerPodService(namespace, podName, sessionName string) error {
// Delete the ambient-vertex secret if it was copied by the operator
deleteCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := deleteAmbientVertexSecret(deleteCtx, namespace); err != nil {
if err := deleteAmbientVertexSecret(deleteCtx, namespace, sessionName); err != nil {
log.Printf("Failed to delete %s secret from %s: %v", types.AmbientVertexSecretName, namespace, err)
// Don't return error - this is a non-critical cleanup step
}

// Delete the Langfuse secret if it was copied by the operator
// This only deletes secrets copied by the operator (with CopiedFromAnnotation).
// The platform-wide ambient-admin-langfuse-secret in the operator namespace is never deleted.
if err := deleteAmbientLangfuseSecret(deleteCtx, namespace); err != nil {
if err := deleteAmbientLangfuseSecret(deleteCtx, namespace, sessionName); err != nil {
log.Printf("Failed to delete ambient-admin-langfuse-secret from %s: %v", namespace, err)
// Don't return error - this is a non-critical cleanup step
}

if err := deleteAmbientMlflowObservabilitySecret(deleteCtx, namespace, sessionName); err != nil {
log.Printf("Failed to delete ambient-admin-mlflow-observability-secret from %s: %v", namespace, err)
}

// NOTE: PVC is kept for all sessions and only deleted via garbage collection
// when the session CR is deleted. This allows sessions to be restarted.

Expand Down Expand Up @@ -2050,7 +2175,9 @@ func copySecretToNamespace(ctx context.Context, sourceSecret *corev1.Secret, tar

// deleteAmbientVertexSecret deletes the ambient-vertex secret from a namespace if it was copied
// and no other active sessions in the namespace still need it.
func deleteAmbientVertexSecret(ctx context.Context, namespace string) error {
// excludeSessionName, if non-empty, is omitted when counting Running/Creating/Pending sessions
// (e.g. the session currently being stopped still reports Running until status is patched).
func deleteAmbientVertexSecret(ctx context.Context, namespace, excludeSessionName string) error {
secret, err := config.K8sClient.CoreV1().Secrets(namespace).Get(ctx, types.AmbientVertexSecretName, v1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
Expand All @@ -2077,6 +2204,9 @@ func deleteAmbientVertexSecret(ctx context.Context, namespace string) error {

activeCount := 0
for _, session := range sessions.Items {
if excludeSessionName != "" && session.GetName() == excludeSessionName {
continue
}
status, _, _ := unstructured.NestedMap(session.Object, "status")
phase := ""
if status != nil {
Expand Down Expand Up @@ -2106,7 +2236,8 @@ func deleteAmbientVertexSecret(ctx context.Context, namespace string) error {

// deleteAmbientLangfuseSecret deletes the ambient-admin-langfuse-secret from a namespace if it was copied
// and no other active sessions in the namespace still need it.
func deleteAmbientLangfuseSecret(ctx context.Context, namespace string) error {
// excludeSessionName, if non-empty, is omitted when counting active sessions (see deleteAmbientVertexSecret).
func deleteAmbientLangfuseSecret(ctx context.Context, namespace, excludeSessionName string) error {
const langfuseSecretName = "ambient-admin-langfuse-secret"
secret, err := config.K8sClient.CoreV1().Secrets(namespace).Get(ctx, langfuseSecretName, v1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -2134,6 +2265,9 @@ func deleteAmbientLangfuseSecret(ctx context.Context, namespace string) error {

activeCount := 0
for _, session := range sessions.Items {
if excludeSessionName != "" && session.GetName() == excludeSessionName {
continue
}
status, _, _ := unstructured.NestedMap(session.Object, "status")
phase := ""
if status != nil {
Expand Down Expand Up @@ -2161,6 +2295,62 @@ func deleteAmbientLangfuseSecret(ctx context.Context, namespace string) error {
return nil
}

// deleteAmbientMlflowObservabilitySecret deletes the copied MLflow observability secret from a
// session namespace when no other active sessions need it (same rules as Langfuse secret).
// excludeSessionName, if non-empty, is omitted when counting active sessions (see deleteAmbientVertexSecret).
func deleteAmbientMlflowObservabilitySecret(ctx context.Context, namespace, excludeSessionName string) error {
const mlflowObsSecretName = "ambient-admin-mlflow-observability-secret"
secret, err := config.K8sClient.CoreV1().Secrets(namespace).Get(ctx, mlflowObsSecretName, v1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return fmt.Errorf("error checking for %s secret: %w", mlflowObsSecretName, err)
}

if _, ok := secret.Annotations[types.CopiedFromAnnotation]; !ok {
log.Printf("%s secret in namespace %s was not copied by operator, not deleting", mlflowObsSecretName, namespace)
return nil
}

gvr := types.GetAgenticSessionResource()
sessions, err := config.DynamicClient.Resource(gvr).Namespace(namespace).List(ctx, v1.ListOptions{})
if err != nil {
log.Printf("Warning: failed to list sessions in namespace %s, skipping secret deletion: %v", namespace, err)
return nil
}

activeCount := 0
for _, session := range sessions.Items {
if excludeSessionName != "" && session.GetName() == excludeSessionName {
continue
}
status, _, _ := unstructured.NestedMap(session.Object, "status")
phase := ""
if status != nil {
if p, ok := status["phase"].(string); ok {
phase = p
}
}
if phase == "Running" || phase == "Creating" || phase == "Pending" {
activeCount++
}
}

if activeCount > 0 {
log.Printf("Skipping %s secret deletion in namespace %s: %d active session(s) may still need it", mlflowObsSecretName, namespace, activeCount)
return nil
}

log.Printf("Deleting copied %s secret from namespace %s (no active sessions)", mlflowObsSecretName, namespace)
err = config.K8sClient.CoreV1().Secrets(namespace).Delete(ctx, mlflowObsSecretName, v1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete %s secret: %w", mlflowObsSecretName, err)
}

return nil
}

// LEGACY: getBackendAPIURL removed - AG-UI migration
// Workflow and repo changes now call runner's REST endpoints directly

Expand Down Expand Up @@ -2268,6 +2458,13 @@ func regenerateRunnerToken(sessionNamespace, sessionName string, session *unstru
Resources: []string{"secrets"},
Verbs: []string{"get"},
},
{
// Kubeflow-style MLflow workspace: server validates bearer token against Experiment CRs.
// Adjust apiGroup/resource if your distribution uses a different MLflow CRD.
APIGroups: []string{"mlflow.kubeflow.org"},
Resources: []string{"experiments"},
Verbs: []string{"get", "list", "update"},
},
},
}
if _, err := config.K8sClient.RbacV1().Roles(sessionNamespace).Create(context.TODO(), role, v1.CreateOptions{}); err != nil {
Expand Down
Loading