Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Example: platform-wide MLflow tracing credentials for the operator to copy into session namespaces.
# Apply to the operator namespace (same pattern as ambient-admin-langfuse-secret).
#
# kubectl create secret generic ambient-admin-mlflow-observability-secret -n <operator-ns> \
# --from-literal=MLFLOW_TRACING_ENABLED=true \
# --from-literal=MLFLOW_TRACKING_URI=https://mlflow.example.com \
# --from-literal=MLFLOW_EXPERIMENT_NAME=ambient-code-sessions \
# --from-literal=OBSERVABILITY_BACKENDS=langfuse,mlflow
#
# Keys:
# MLFLOW_TRACING_ENABLED - "true" to enable secret copy + runner env injection
# MLFLOW_TRACKING_URI - required on the runner for MLflow tracing
# MLFLOW_EXPERIMENT_NAME - optional (runner default: ambient-code-sessions)
# OBSERVABILITY_BACKENDS - optional; comma list: langfuse, mlflow (runner default: langfuse only if unset)
apiVersion: v1
kind: Secret
metadata:
name: ambient-admin-mlflow-observability-secret
namespace: CHANGE_ME_OPERATOR_NAMESPACE
type: Opaque
stringData:
MLFLOW_TRACING_ENABLED: "true"
MLFLOW_TRACKING_URI: "https://mlflow.example.com"
MLFLOW_EXPERIMENT_NAME: "ambient-code-sessions"
OBSERVABILITY_BACKENDS: "langfuse,mlflow"
13 changes: 13 additions & 0 deletions components/manifests/base/core/operator-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,19 @@ spec:
name: ambient-admin-langfuse-secret
key: LANGFUSE_SECRET_KEY
optional: true # Optional: only needed if Langfuse enabled
# MLflow tracing (runner); optional secret managed by platform admin
- name: MLFLOW_TRACING_ENABLED
valueFrom:
secretKeyRef:
name: ambient-admin-mlflow-observability-secret
key: MLFLOW_TRACING_ENABLED
optional: true
- name: OBSERVABILITY_BACKENDS
valueFrom:
secretKeyRef:
name: ambient-admin-mlflow-observability-secret
key: OBSERVABILITY_BACKENDS
optional: true
# Google OAuth client credentials for workspace-mcp
- name: GOOGLE_OAUTH_CLIENT_ID
valueFrom:
Expand Down
131 changes: 131 additions & 0 deletions components/operator/internal/handlers/sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,10 @@ func handleAgenticSessionEvent(obj *unstructured.Unstructured) error {
// Continue - session cleanup is still successful
}

if err := deleteAmbientMlflowObservabilitySecret(deleteCtx, sessionNamespace); err != nil {
log.Printf("Warning: Failed to cleanup ambient-admin-mlflow-observability-secret from %s: %v", sessionNamespace, err)
}

return nil
}

Expand Down Expand Up @@ -576,6 +580,30 @@ func handleAgenticSessionEvent(obj *unstructured.Unstructured) error {
log.Printf("Langfuse disabled, skipping secret copy")
}

ambientMlflowObsSecretCopied := false
mlflowTracingEnabled := os.Getenv("MLFLOW_TRACING_ENABLED") != "" && os.Getenv("MLFLOW_TRACING_ENABLED") != "0" && os.Getenv("MLFLOW_TRACING_ENABLED") != "false"

if mlflowTracingEnabled {
const mlflowObsSecretName = "ambient-admin-mlflow-observability-secret"
if mlflowSecret, err := config.K8sClient.CoreV1().Secrets(operatorNamespace).Get(context.TODO(), mlflowObsSecretName, v1.GetOptions{}); err == nil {
log.Printf("Found %s in %s, copying to %s", mlflowObsSecretName, operatorNamespace, sessionNamespace)
copyCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := copySecretToNamespace(copyCtx, mlflowSecret, sessionNamespace, currentObj); err != nil {
log.Printf("Warning: Failed to copy MLflow observability secret: %v. MLflow tracing will be disabled for this session.", err)
} else {
ambientMlflowObsSecretCopied = true
log.Printf("Successfully copied %s to %s", mlflowObsSecretName, sessionNamespace)
}
} else if !errors.IsNotFound(err) {
log.Printf("Warning: Failed to check for %s in %s: %v. MLflow tracing may be disabled for this session.", mlflowObsSecretName, operatorNamespace, err)
} else {
log.Printf("Warning: MLFLOW_TRACING_ENABLED is set but %s not found in namespace %s. MLflow tracing will be disabled for this session.", mlflowObsSecretName, operatorNamespace)
}
} else {
log.Printf("MLflow tracing disabled, skipping MLflow observability secret copy")
}

// Create a Kubernetes Pod for this AgenticSession
podName := fmt.Sprintf("%s-runner", name)

Expand Down Expand Up @@ -1097,6 +1125,53 @@ func handleAgenticSessionEvent(obj *unstructured.Unstructured) error {
log.Printf("Langfuse env vars configured via secretKeyRef for session %s", name)
}

const mlflowObsSecretName = "ambient-admin-mlflow-observability-secret"
if ambientMlflowObsSecretCopied {
base = append(base,
corev1.EnvVar{
Name: "MLFLOW_TRACING_ENABLED",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{Name: mlflowObsSecretName},
Key: "MLFLOW_TRACING_ENABLED",
Optional: boolPtr(true),
},
},
},
corev1.EnvVar{
Name: "MLFLOW_TRACKING_URI",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{Name: mlflowObsSecretName},
Key: "MLFLOW_TRACKING_URI",
Optional: boolPtr(true),
},
},
},
corev1.EnvVar{
Name: "MLFLOW_EXPERIMENT_NAME",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{Name: mlflowObsSecretName},
Key: "MLFLOW_EXPERIMENT_NAME",
Optional: boolPtr(true),
},
},
},
corev1.EnvVar{
Name: "OBSERVABILITY_BACKENDS",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{Name: mlflowObsSecretName},
Key: "OBSERVABILITY_BACKENDS",
Optional: boolPtr(true),
},
},
},
)
log.Printf("MLflow observability env vars configured via secretKeyRef for session %s", name)
}

// Add Vertex AI configuration only if enabled
if vertexEnabled {
base = append(base,
Expand Down Expand Up @@ -1929,6 +2004,10 @@ func deletePodAndPerPodService(namespace, podName, sessionName string) error {
// Don't return error - this is a non-critical cleanup step
}

if err := deleteAmbientMlflowObservabilitySecret(deleteCtx, namespace); err != nil {
log.Printf("Failed to delete ambient-admin-mlflow-observability-secret from %s: %v", namespace, err)
}

// NOTE: PVC is kept for all sessions and only deleted via garbage collection
// when the session CR is deleted. This allows sessions to be restarted.

Expand Down Expand Up @@ -2161,6 +2240,58 @@ func deleteAmbientLangfuseSecret(ctx context.Context, namespace string) error {
return nil
}

// deleteAmbientMlflowObservabilitySecret deletes the copied MLflow observability secret from a
// session namespace when no other active sessions need it (same rules as Langfuse secret).
func deleteAmbientMlflowObservabilitySecret(ctx context.Context, namespace string) error {
const mlflowObsSecretName = "ambient-admin-mlflow-observability-secret"
secret, err := config.K8sClient.CoreV1().Secrets(namespace).Get(ctx, mlflowObsSecretName, v1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return fmt.Errorf("error checking for %s secret: %w", mlflowObsSecretName, err)
}

if _, ok := secret.Annotations[types.CopiedFromAnnotation]; !ok {
log.Printf("%s secret in namespace %s was not copied by operator, not deleting", mlflowObsSecretName, namespace)
return nil
}

gvr := types.GetAgenticSessionResource()
sessions, err := config.DynamicClient.Resource(gvr).Namespace(namespace).List(ctx, v1.ListOptions{})
if err != nil {
log.Printf("Warning: failed to list sessions in namespace %s, skipping secret deletion: %v", namespace, err)
return nil
}

activeCount := 0
for _, session := range sessions.Items {
status, _, _ := unstructured.NestedMap(session.Object, "status")
phase := ""
if status != nil {
if p, ok := status["phase"].(string); ok {
phase = p
}
}
if phase == "Running" || phase == "Creating" || phase == "Pending" {
activeCount++
}
}

if activeCount > 0 {
log.Printf("Skipping %s secret deletion in namespace %s: %d active session(s) may still need it", mlflowObsSecretName, namespace, activeCount)
return nil
}

log.Printf("Deleting copied %s secret from namespace %s (no active sessions)", mlflowObsSecretName, namespace)
err = config.K8sClient.CoreV1().Secrets(namespace).Delete(ctx, mlflowObsSecretName, v1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete %s secret: %w", mlflowObsSecretName, err)
}

return nil
}

// LEGACY: getBackendAPIURL removed - AG-UI migration
// Workflow and repo changes now call runner's REST endpoints directly

Expand Down
23 changes: 22 additions & 1 deletion components/runners/ambient-runner/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ The runner follows the [Ambient Runner SDK architecture (ADR-0006)](../../../doc
- **`auth.py`** - SDK authentication (API key, Vertex AI) and runtime credentials
- **`workspace.py`** - Workspace validation and path resolution
- **`mcp.py`** - MCP server configuration and tool allowlisting
- **`observability.py`** - Langfuse integration for tracking usage and performance
- **`observability.py`** - Langfuse + optional MLflow tracing (`observability_config.py`, `mlflow_observability.py`, `observability_privacy.py`)
- **`context.py`** - Runner context for session and workspace management
- **`security_utils.py`** - Security utilities for sanitizing secrets and timeouts

Expand Down Expand Up @@ -128,9 +128,30 @@ The workspace context prompt is built by `build_sdk_system_prompt()` in `prompts

### Observability

**Langfuse** (unchanged defaults when `OBSERVABILITY_BACKENDS` is unset — Langfuse-only):

- `LANGFUSE_ENABLED` - Enable Langfuse (`true` / `1`)
- `LANGFUSE_PUBLIC_KEY` - Langfuse public key
- `LANGFUSE_SECRET_KEY` - Langfuse secret key
- `LANGFUSE_HOST` - Langfuse host URL
- `LANGFUSE_MASK_MESSAGES` - Redact message bodies (`true` default; shared with MLflow path)

**Backend selection**

- `OBSERVABILITY_BACKENDS` - Comma-separated: `langfuse`, `mlflow`, or both (e.g. `langfuse,mlflow`). If unset, defaults to **`langfuse`** only so existing Langfuse behaviour is preserved.

**MLflow GenAI tracing** (optional extra: `pip install 'ambient-runner[mlflow-observability]'`)

- `MLFLOW_TRACING_ENABLED` - Must be `true` / `1` together with `mlflow` in `OBSERVABILITY_BACKENDS`
- `MLFLOW_TRACKING_URI` - MLflow tracking server URI (e.g. `https://mlflow.example.com` or `file:./mlruns` for local tests)
- `MLFLOW_EXPERIMENT_NAME` - Experiment name (default: `ambient-code-sessions`)

**OTLP export from MLflow** (no code changes — configure before the process creates spans; see [MLflow OTLP export](https://mlflow.org/docs/latest/genai/tracing/opentelemetry/export/)):

- `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` - e.g. `http://otel-collector:4318/v1/traces` with `opentelemetry-exporter-otlp` installed
- `OTEL_SERVICE_NAME` - Service name on exported spans
- `MLFLOW_TRACE_ENABLE_OTLP_DUAL_EXPORT=true` - Send traces to **both** the MLflow Tracking Server and OTLP
- Optional: `OTEL_EXPORTER_OTLP_TRACES_PROTOCOL`, `OTEL_EXPORTER_OTLP_TRACES_HEADERS`

### Backend Integration

Expand Down
5 changes: 4 additions & 1 deletion components/runners/ambient-runner/ambient_runner/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ and platform services. Framework-specific logic lives in bridge subpackages.
ambient_runner/
├── app.py # create_ambient_app(), run_ambient_app(), add_ambient_endpoints()
├── bridge.py # PlatformBridge ABC, FrameworkCapabilities, RunnerContext import
├── observability.py # ObservabilityManager (Langfuse, ~900 lines)
├── observability.py # ObservabilityManager (Langfuse + optional MLflow)
├── observability_config.py # OBSERVABILITY_BACKENDS, use_*_backend()
├── observability_privacy.py # Shared message masking (LANGFUSE_MASK_MESSAGES)
├── mlflow_observability.py # MLflowSessionTracer (parallel spans)
├── bridges/
│ ├── claude/ # Claude Agent SDK bridge
│ │ ├── bridge.py # ClaudeBridge class (full lifecycle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,15 @@ def __init__(self) -> None:
# ------------------------------------------------------------------

def capabilities(self) -> FrameworkCapabilities:
has_tracing = (
self._obs is not None
and hasattr(self._obs, "langfuse_client")
and self._obs.langfuse_client is not None
)
tracing_label = None
if self._obs is not None:
cap = getattr(self._obs, "tracing_capability_label", None)
if isinstance(cap, str) and cap:
tracing_label = cap
elif getattr(self._obs, "langfuse_client", None):
tracing_label = "langfuse"
elif getattr(self._obs, "mlflow_tracing_active", False):
tracing_label = "mlflow"
return FrameworkCapabilities(
framework="claude-agent-sdk",
agent_features=[
Expand All @@ -90,7 +94,7 @@ def capabilities(self) -> FrameworkCapabilities:
],
file_system=True,
mcp=True,
tracing="langfuse" if has_tracing else None,
tracing=tracing_label,
session_persistence=True,
)

Expand Down
Loading
Loading