diff --git a/.github/workflows/components-build-deploy.yml b/.github/workflows/components-build-deploy.yml index e389c9c12..632568114 100644 --- a/.github/workflows/components-build-deploy.yml +++ b/.github/workflows/components-build-deploy.yml @@ -252,13 +252,13 @@ jobs: if [ "${{ needs.detect-changes.outputs.claude-runner }}" == "true" ]; then echo "runner_tag=${{ github.sha }}" >> $GITHUB_OUTPUT else - echo "runner_tag=stage" >> $GITHUB_OUTPUT + echo "runner_tag=latest" >> $GITHUB_OUTPUT fi if [ "${{ needs.detect-changes.outputs.state-sync }}" == "true" ]; then echo "state_sync_tag=${{ github.sha }}" >> $GITHUB_OUTPUT else - echo "state_sync_tag=stage" >> $GITHUB_OUTPUT + echo "state_sync_tag=latest" >> $GITHUB_OUTPUT fi if [ "${{ needs.detect-changes.outputs.public-api }}" == "true" ]; then @@ -303,6 +303,27 @@ jobs: AMBIENT_CODE_RUNNER_IMAGE="quay.io/ambient_code/vteam_claude_runner:${{ steps.image-tags.outputs.runner_tag }}" \ STATE_SYNC_IMAGE="quay.io/ambient_code/vteam_state_sync:${{ steps.image-tags.outputs.state_sync_tag }}" + - name: Update agent registry ConfigMap with pinned image tags + if: needs.detect-changes.outputs.claude-runner == 'true' || needs.detect-changes.outputs.state-sync == 'true' + run: | + # Fetch live JSON from cluster so unchanged images keep their current tag + REGISTRY=$(oc get configmap ambient-agent-registry -n ambient-code \ + -o jsonpath='{.data.agent-registry\.json}') + + # Only replace images that were actually rebuilt this run. + # Pattern [@:][^"]* matches both :tag and @sha256:digest refs. + if [ "${{ needs.detect-changes.outputs.claude-runner }}" == "true" ]; then + REGISTRY=$(echo "$REGISTRY" | sed \ + "s|quay.io/ambient_code/vteam_claude_runner[@:][^\"]*|quay.io/ambient_code/vteam_claude_runner:${{ github.sha }}|g") + fi + if [ "${{ needs.detect-changes.outputs.state-sync }}" == "true" ]; then + REGISTRY=$(echo "$REGISTRY" | sed \ + "s|quay.io/ambient_code/vteam_state_sync[@:][^\"]*|quay.io/ambient_code/vteam_state_sync:${{ github.sha }}|g") + fi + + oc patch configmap ambient-agent-registry -n ambient-code --type=merge \ + -p "{\"data\":{\"agent-registry.json\":$(echo "$REGISTRY" | jq -Rs .)}}" + deploy-with-disptach: runs-on: ubuntu-latest needs: [detect-changes, build-and-push, update-rbac-and-crd] @@ -357,5 +378,15 @@ jobs: - name: Update operator environment variables run: | oc set env deployment/agentic-operator -n ambient-code -c agentic-operator \ - AMBIENT_CODE_RUNNER_IMAGE="quay.io/ambient_code/vteam_claude_runner:stage" \ - STATE_SYNC_IMAGE="quay.io/ambient_code/vteam_state_sync:stage" + AMBIENT_CODE_RUNNER_IMAGE="quay.io/ambient_code/vteam_claude_runner:${{ github.sha }}" \ + STATE_SYNC_IMAGE="quay.io/ambient_code/vteam_state_sync:${{ github.sha }}" + + - name: Update agent registry ConfigMap with pinned image tags + run: | + REGISTRY=$(oc get configmap ambient-agent-registry -n ambient-code \ + -o jsonpath='{.data.agent-registry\.json}') + REGISTRY=$(echo "$REGISTRY" | sed \ + -e "s|quay.io/ambient_code/vteam_claude_runner[@:][^\"]*|quay.io/ambient_code/vteam_claude_runner:${{ github.sha }}|g" \ + -e "s|quay.io/ambient_code/vteam_state_sync[@:][^\"]*|quay.io/ambient_code/vteam_state_sync:${{ github.sha }}|g") + oc patch configmap ambient-agent-registry -n ambient-code --type=merge \ + -p "{\"data\":{\"agent-registry.json\":$(echo "$REGISTRY" | jq -Rs .)}}" diff --git a/.github/workflows/prod-release-deploy.yaml b/.github/workflows/prod-release-deploy.yaml index ab1b51993..ff9285479 100644 --- a/.github/workflows/prod-release-deploy.yaml +++ b/.github/workflows/prod-release-deploy.yaml @@ -22,6 +22,10 @@ on: required: false type: string default: '' +concurrency: + group: prod-release-deploy + cancel-in-progress: false + jobs: release: runs-on: ubuntu-latest @@ -273,3 +277,14 @@ jobs: oc set env deployment/agentic-operator -n ambient-code -c agentic-operator \ AMBIENT_CODE_RUNNER_IMAGE="quay.io/ambient_code/vteam_claude_runner:${{ needs.release.outputs.new_tag }}" \ STATE_SYNC_IMAGE="quay.io/ambient_code/vteam_state_sync:${{ needs.release.outputs.new_tag }}" + + - name: Update agent registry ConfigMap with release image tags + run: | + RELEASE_TAG="${{ needs.release.outputs.new_tag }}" + REGISTRY=$(oc get configmap ambient-agent-registry -n ambient-code \ + -o jsonpath='{.data.agent-registry\.json}') + REGISTRY=$(echo "$REGISTRY" | sed \ + -e "s|quay.io/ambient_code/vteam_claude_runner[@:][^\"]*|quay.io/ambient_code/vteam_claude_runner:${RELEASE_TAG}|g" \ + -e "s|quay.io/ambient_code/vteam_state_sync[@:][^\"]*|quay.io/ambient_code/vteam_state_sync:${RELEASE_TAG}|g") + oc patch configmap ambient-agent-registry -n ambient-code --type=merge \ + -p "{\"data\":{\"agent-registry.json\":$(echo "$REGISTRY" | jq -Rs .)}}" diff --git a/components/runners/ambient-runner/ambient_runner/bridge.py b/components/runners/ambient-runner/ambient_runner/bridge.py index 9935945e4..4a5762473 100644 --- a/components/runners/ambient-runner/ambient_runner/bridge.py +++ b/components/runners/ambient-runner/ambient_runner/bridge.py @@ -23,8 +23,10 @@ async def interrupt(self, thread_id=None) -> None: pass """ +import asyncio import logging import os +import time from abc import ABC, abstractmethod from dataclasses import dataclass, field from typing import Any, AsyncIterator, Optional @@ -43,6 +45,30 @@ async def interrupt(self, thread_id=None) -> None: TOOL_REFRESH_MIN_INTERVAL_SEC = 30 +def _async_safe_manager_shutdown(manager: Any) -> None: + """Fire-and-forget async shutdown of a session manager from sync context. + + Used by ``mark_dirty()`` implementations in all bridges. Handles both + cases: called from within a running event loop (schedules as a task) + and called outside any loop (blocks via ``asyncio.run``). + """ + try: + loop = asyncio.get_running_loop() + task = loop.create_task(manager.shutdown()) + task.add_done_callback( + lambda f: _bridge_logger.warning( + "mark_dirty: session_manager shutdown error: %s", f.exception() + ) + if f.exception() + else None + ) + except RuntimeError: + try: + asyncio.run(manager.shutdown()) + except Exception as exc: + _bridge_logger.warning("mark_dirty: session_manager shutdown error: %s", exc) + + @dataclass class FrameworkCapabilities: """Declares what a framework adapter supports. @@ -77,6 +103,11 @@ class PlatformBridge(ABC): - ``get_error_context()`` — returns extra error info for failed runs """ + def __init__(self) -> None: + self._context: Optional[RunnerContext] = None + self._ready: bool = False + self._last_creds_refresh: float = 0.0 + # ------------------------------------------------------------------ # Required (abstract) # ------------------------------------------------------------------ @@ -116,11 +147,39 @@ async def interrupt(self, thread_id: Optional[str] = None) -> None: # ------------------------------------------------------------------ def set_context(self, context: RunnerContext) -> None: - """Store the runner context for later use. + """Store the runner context (called from lifespan before any requests).""" + self._context = context - Called by the platform lifespan before any requests are served. - Override to capture the context for use in ``run()``. + async def _refresh_credentials_if_stale(self) -> None: + """Refresh platform credentials if the refresh interval has elapsed. + + Call this at the start of each ``run()`` to keep tokens fresh. """ + now = time.monotonic() + if now - self._last_creds_refresh > CREDS_REFRESH_INTERVAL_SEC: + from ambient_runner.platform.auth import populate_runtime_credentials + + await populate_runtime_credentials(self._context) + self._last_creds_refresh = now + + async def _ensure_ready(self) -> None: + """Run one-time platform setup on the first ``run()`` call. + + Calls ``_setup_platform()`` the first time, then sets ``self._ready``. + """ + if self._ready: + return + if not self._context: + raise RuntimeError("Context not set — call set_context() first") + await self._setup_platform() + self._ready = True + _bridge_logger.info( + "Platform ready — model: %s", + getattr(self, "_configured_model", ""), + ) + + async def _setup_platform(self) -> None: + """Framework-specific platform setup. Override in each bridge.""" pass async def shutdown(self) -> None: diff --git a/components/runners/ambient-runner/ambient_runner/bridges/claude/auth.py b/components/runners/ambient-runner/ambient_runner/bridges/claude/auth.py index 0e817405d..a1f4dd6e9 100644 --- a/components/runners/ambient-runner/ambient_runner/bridges/claude/auth.py +++ b/components/runners/ambient-runner/ambient_runner/bridges/claude/auth.py @@ -9,8 +9,8 @@ import logging import os -from pathlib import Path +from ambient_runner.platform.auth import validate_vertex_credentials_file from ambient_runner.platform.context import RunnerContext from ambient_runner.platform.utils import is_vertex_enabled @@ -38,14 +38,10 @@ def map_to_vertex_model(model: str) -> str: async def setup_vertex_credentials(context: RunnerContext) -> dict: """Set up Google Cloud Vertex AI credentials from service account.""" - service_account_path = context.get_env("GOOGLE_APPLICATION_CREDENTIALS", "").strip() + service_account_path = validate_vertex_credentials_file(context) project_id = context.get_env("ANTHROPIC_VERTEX_PROJECT_ID", "").strip() region = context.get_env("CLOUD_ML_REGION", "").strip() - if not service_account_path: - raise RuntimeError( - "GOOGLE_APPLICATION_CREDENTIALS must be set when USE_VERTEX is enabled" - ) if not project_id: raise RuntimeError( "ANTHROPIC_VERTEX_PROJECT_ID must be set when USE_VERTEX is enabled" @@ -53,11 +49,6 @@ async def setup_vertex_credentials(context: RunnerContext) -> dict: if not region: raise RuntimeError("CLOUD_ML_REGION must be set when USE_VERTEX is enabled") - if not Path(service_account_path).exists(): - raise RuntimeError( - f"Service account key file not found at {service_account_path}" - ) - logger.info(f"Vertex AI configured: project={project_id}, region={region}") return { "credentials_path": service_account_path, diff --git a/components/runners/ambient-runner/ambient_runner/bridges/claude/bridge.py b/components/runners/ambient-runner/ambient_runner/bridges/claude/bridge.py index cf0108082..4005bec8d 100644 --- a/components/runners/ambient-runner/ambient_runner/bridges/claude/bridge.py +++ b/components/runners/ambient-runner/ambient_runner/bridges/claude/bridge.py @@ -9,7 +9,6 @@ - Interrupt and graceful shutdown """ -import asyncio import logging import os import time @@ -19,9 +18,9 @@ from ag_ui_claude_sdk import ClaudeAgentAdapter from ambient_runner.bridge import ( - CREDS_REFRESH_INTERVAL_SEC, FrameworkCapabilities, PlatformBridge, + _async_safe_manager_shutdown, setup_bridge_observability, ) from ambient_runner.bridges.claude.session import SessionManager @@ -43,13 +42,12 @@ class ClaudeBridge(PlatformBridge): """ def __init__(self) -> None: + super().__init__() self._adapter: ClaudeAgentAdapter | None = None self._session_manager: SessionManager | None = None self._obs: Any = None - self._context: RunnerContext | None = None # Platform state (populated by _setup_platform) - self._ready: bool = False self._first_run: bool = True self._configured_model: str = "" self._cwd_path: str = "" @@ -58,7 +56,6 @@ def __init__(self) -> None: self._allowed_tools: list[str] = [] self._system_prompt: dict = {} self._stderr_lines: list[str] = [] - self._last_creds_refresh: float = 0.0 # ------------------------------------------------------------------ # PlatformBridge interface @@ -89,14 +86,7 @@ async def run(self, input_data: RunAgentInput) -> AsyncIterator[BaseEvent]: """Full run lifecycle: lazy setup → adapter → session worker → tracing.""" # 1. Lazy platform setup await self._ensure_ready() - - # Refresh credentials if stale (tokens may have expired) - now = time.monotonic() - if now - self._last_creds_refresh > CREDS_REFRESH_INTERVAL_SEC: - from ambient_runner.platform.auth import populate_runtime_credentials - - await populate_runtime_credentials(self._context) - self._last_creds_refresh = now + await self._refresh_credentials_if_stale() # 2. Ensure adapter exists self._ensure_adapter() @@ -153,10 +143,6 @@ async def interrupt(self, thread_id: Optional[str] = None) -> None: # Lifecycle methods # ------------------------------------------------------------------ - def set_context(self, context: RunnerContext) -> None: - """Store the runner context (called from lifespan).""" - self._context = context - async def shutdown(self) -> None: """Graceful shutdown: persist sessions, finalise tracing.""" if self._session_manager: @@ -179,22 +165,7 @@ def mark_dirty(self) -> None: if self._session_manager: manager = self._session_manager self._session_manager = None - try: - asyncio.get_running_loop() # raises RuntimeError if no loop - future = asyncio.ensure_future(manager.shutdown()) - future.add_done_callback( - lambda f: logger.warning( - "mark_dirty: session_manager shutdown error: %s", f.exception() - ) - if f.exception() - else None - ) - except RuntimeError: - # No running loop — safe to block - try: - asyncio.run(manager.shutdown()) - except Exception as e: - logger.warning("mark_dirty: session_manager shutdown error: %s", e) + _async_safe_manager_shutdown(manager) logger.info("ClaudeBridge: marked dirty — will reinitialise on next run") def get_error_context(self) -> str: @@ -300,18 +271,6 @@ def session_manager(self) -> SessionManager | None: # Private: platform setup (lazy, called on first run) # ------------------------------------------------------------------ - async def _ensure_ready(self) -> None: - """Run one-time platform setup if not already done.""" - if self._ready: - return - if not self._context: - raise RuntimeError("Context not set — call set_context() first") - await self._setup_platform() - self._ready = True - logger.info( - f"Platform ready — model: {self._configured_model}, cwd: {self._cwd_path}" - ) - async def _setup_platform(self) -> None: """Full platform setup: auth, workspace, MCP, observability.""" # Session manager diff --git a/components/runners/ambient-runner/ambient_runner/bridges/gemini_cli/auth.py b/components/runners/ambient-runner/ambient_runner/bridges/gemini_cli/auth.py index 9409cdfac..aa3dc79f2 100644 --- a/components/runners/ambient-runner/ambient_runner/bridges/gemini_cli/auth.py +++ b/components/runners/ambient-runner/ambient_runner/bridges/gemini_cli/auth.py @@ -2,6 +2,7 @@ import logging +from ambient_runner.platform.auth import validate_vertex_credentials_file from ambient_runner.platform.context import RunnerContext from ambient_runner.platform.utils import is_vertex_enabled @@ -26,8 +27,19 @@ async def setup_gemini_cli_auth(context: RunnerContext) -> tuple[str, str, bool] use_vertex = is_vertex_enabled(legacy_var="GEMINI_USE_VERTEX", context=context) if use_vertex: - project = context.get_env("GOOGLE_CLOUD_PROJECT", "").strip() - location = context.get_env("GOOGLE_CLOUD_LOCATION", "").strip() + validate_vertex_credentials_file(context) + + # Resolve project/location — may be set directly or via platform aliases + # (ANTHROPIC_VERTEX_PROJECT_ID / CLOUD_ML_REGION). The subprocess mapping + # in session.py handles the latter; here we just log what we found. + project = ( + context.get_env("GOOGLE_CLOUD_PROJECT", "").strip() + or context.get_env("ANTHROPIC_VERTEX_PROJECT_ID", "").strip() + ) + location = ( + context.get_env("GOOGLE_CLOUD_LOCATION", "").strip() + or context.get_env("CLOUD_ML_REGION", "").strip() + ) logger.info( "Gemini CLI: Vertex AI mode (project=%s, location=%s, model=%s)", diff --git a/components/runners/ambient-runner/ambient_runner/bridges/gemini_cli/bridge.py b/components/runners/ambient-runner/ambient_runner/bridges/gemini_cli/bridge.py index 6352da443..77915f922 100644 --- a/components/runners/ambient-runner/ambient_runner/bridges/gemini_cli/bridge.py +++ b/components/runners/ambient-runner/ambient_runner/bridges/gemini_cli/bridge.py @@ -19,9 +19,9 @@ from ag_ui_gemini_cli.utils import extract_user_message from ambient_runner.bridge import ( - CREDS_REFRESH_INTERVAL_SEC, FrameworkCapabilities, PlatformBridge, + _async_safe_manager_shutdown, setup_bridge_observability, ) from ambient_runner.bridges.gemini_cli.session import ( @@ -42,13 +42,12 @@ class GeminiCLIBridge(PlatformBridge): """ def __init__(self) -> None: + super().__init__() self._session_manager: GeminiSessionManager | None = None self._adapter: GeminiCLIAdapter | None = None self._obs: Any = None - self._context: RunnerContext | None = None # Platform state (populated by _setup_platform) - self._ready: bool = False self._configured_model: str = "" self._api_key: str = "" self._use_vertex: bool = False @@ -56,7 +55,6 @@ def __init__(self) -> None: self._include_directories: list[str] = [] self._mcp_settings_path: str | None = None self._mcp_status_cache: dict | None = None - self._last_creds_refresh: float = 0.0 # ------------------------------------------------------------------ # PlatformBridge interface @@ -80,14 +78,7 @@ async def run(self, input_data: RunAgentInput) -> AsyncIterator[BaseEvent]: """Full run lifecycle: lazy setup -> session worker -> tracing.""" # 1. Lazy platform setup await self._ensure_ready() - - # Refresh credentials if stale - now = time.monotonic() - if now - self._last_creds_refresh > CREDS_REFRESH_INTERVAL_SEC: - from ambient_runner.platform.auth import populate_runtime_credentials - - await populate_runtime_credentials(self._context) - self._last_creds_refresh = now + await self._refresh_credentials_if_stale() # 2. Extract user message user_msg = extract_user_message(input_data) @@ -157,10 +148,6 @@ async def interrupt(self, thread_id: Optional[str] = None) -> None: # Lifecycle methods # ------------------------------------------------------------------ - def set_context(self, context: RunnerContext) -> None: - """Store the runner context (called from lifespan).""" - self._context = context - async def shutdown(self) -> None: """Graceful shutdown: stop workers, finalise tracing.""" if self._session_manager: @@ -186,22 +173,7 @@ def mark_dirty(self) -> None: if self._session_manager: manager = self._session_manager self._session_manager = None - try: - asyncio.get_running_loop() - future = asyncio.ensure_future(manager.shutdown()) - future.add_done_callback( - lambda f: logger.warning( - "mark_dirty: session_manager shutdown error: %s", - f.exception(), - ) - if f.exception() - else None - ) - except RuntimeError: - try: - asyncio.run(manager.shutdown()) - except Exception as e: - logger.warning("mark_dirty: session_manager shutdown error: %s", e) + _async_safe_manager_shutdown(manager) logger.info("GeminiCLIBridge: marked dirty -- will reinitialise on next run") def get_error_context(self) -> str: @@ -286,20 +258,6 @@ def obs(self) -> Any: # Private: platform setup (lazy, called on first run) # ------------------------------------------------------------------ - async def _ensure_ready(self) -> None: - """Run one-time platform setup if not already done.""" - if self._ready: - return - if not self._context: - raise RuntimeError("Context not set -- call set_context() first") - await self._setup_platform() - self._ready = True - logger.info( - "Platform ready -- model: %s, cwd: %s", - self._configured_model, - self._cwd_path, - ) - async def _setup_platform(self) -> None: """Full platform setup: auth, workspace, observability.""" # Session manager with state dir for session_id persistence across restarts @@ -322,22 +280,29 @@ async def _setup_platform(self) -> None: self._last_creds_refresh = time.monotonic() # Workspace paths - cwd_path, _add_dirs = resolve_workspace_paths(self._context) + cwd_path, add_dirs = resolve_workspace_paths(self._context) # Observability self._obs = await setup_bridge_observability(self._context, model) # MCP servers — write .gemini/settings.json so the CLI discovers them from ambient_runner.bridges.gemini_cli.mcp import setup_gemini_mcp + from ambient_runner.bridges.gemini_cli.system_prompt import write_gemini_system_prompt mcp_settings_path = setup_gemini_mcp(self._context, cwd_path) - # Build include directories (repos, uploads, artifacts, file-uploads) + # System prompt — write .gemini/system.md and set GEMINI_SYSTEM_MD=true. + # Uses ${AgentSkills} / ${AvailableTools} substitution to preserve + # Gemini's built-in instructions, then appends platform context. + write_gemini_system_prompt(cwd_path) + + # Build include directories: platform-provided dirs (repos, workflows, + # file-uploads) plus well-known workspace subdirs, excluding cwd itself. workspace = os.getenv("WORKSPACE_PATH", "/workspace") - include_dirs = [] + include_dirs = list(add_dirs) if add_dirs else [] for subdir in ["repos", "artifacts", "file-uploads"]: d = os.path.join(workspace, subdir) - if os.path.isdir(d) and d != cwd_path: + if os.path.isdir(d) and d != cwd_path and d not in include_dirs: include_dirs.append(d) # Store results diff --git a/components/runners/ambient-runner/ambient_runner/bridges/gemini_cli/commands.py b/components/runners/ambient-runner/ambient_runner/bridges/gemini_cli/commands.py new file mode 100644 index 000000000..d6af96313 --- /dev/null +++ b/components/runners/ambient-runner/ambient_runner/bridges/gemini_cli/commands.py @@ -0,0 +1,95 @@ +"""Write Gemini CLI custom commands for ambient platform features. + +Custom commands are TOML files discovered from: + - ~/.gemini/commands/ (global) + - /.gemini/commands/ (project-level, takes precedence) + +We write platform commands to the project-level commands dir so they are +scoped to the session workspace and don't pollute the container's home. + +Commands written: + /ambient:evaluate-rubric — evaluate output against .ambient/rubric.md + /ambient:log-correction — log a correction to the feedback loop +""" + +import logging +from pathlib import Path + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Command definitions +# --------------------------------------------------------------------------- + +_EVALUATE_RUBRIC_TOML = '''\ +description = "Evaluate session output against the workflow rubric in .ambient/rubric.md" +prompt = """ +@{.ambient/rubric.md} + +--- + +Evaluate the completed session output against each criterion in the rubric above. + +Then call the `evaluate_rubric` tool (from the ambient-feedback MCP server) with: +- score: your overall numeric evaluation score +- comment: your evaluation reasoning, covering each criterion + +Provide honest, calibrated scores with clear reasoning. Always read the rubric +before scoring — the file is embedded above. + +{{args}} +""" +''' + +_LOG_CORRECTION_TOML = '''\ +description = "Log a correction to the improvement feedback loop before fixing the issue" +prompt = """ +The user has corrected something you did or assumed. BEFORE fixing the issue, +call the `log_correction` tool (from the ambient-feedback MCP server) with: + +- correction_type: one of: + "incomplete" — missed something that should have been done + "incorrect" — did the wrong thing + "out_of_scope" — worked on wrong files or area + "style" — right result but wrong approach or pattern +- agent_action: what you did or assumed (be specific and honest) +- user_correction: exactly what the user said should have happened instead +- source: "human" + +Use broad judgment — if the user is steering you away from something you already +did or decided, that is a correction. When in doubt, log it first, then fix. + +{{args}} +""" +''' + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +def write_gemini_commands(cwd_path: str) -> None: + """Write ambient custom commands to the project-level .gemini/commands/ dir. + + Args: + cwd_path: The session working directory (project root for Gemini CLI). + """ + commands_dir = Path(cwd_path) / ".gemini" / "commands" / "ambient" + try: + commands_dir.mkdir(parents=True, exist_ok=True) + except OSError as exc: + logger.warning("Could not create commands dir %s: %s", commands_dir, exc) + return + + _write_command(commands_dir / "evaluate-rubric.toml", _EVALUATE_RUBRIC_TOML) + _write_command(commands_dir / "log-correction.toml", _LOG_CORRECTION_TOML) + + logger.info("Wrote Gemini CLI custom commands to %s", commands_dir) + + +def _write_command(path: Path, content: str) -> None: + try: + path.write_text(content, encoding="utf-8") + path.chmod(0o644) + except OSError as exc: + logger.warning("Could not write command file %s: %s", path, exc) diff --git a/components/runners/ambient-runner/ambient_runner/bridges/gemini_cli/feedback_server.py b/components/runners/ambient-runner/ambient_runner/bridges/gemini_cli/feedback_server.py new file mode 100644 index 000000000..9ae9d2d18 --- /dev/null +++ b/components/runners/ambient-runner/ambient_runner/bridges/gemini_cli/feedback_server.py @@ -0,0 +1,206 @@ +"""Minimal stdio MCP server exposing evaluate_rubric and log_correction tools. + +Runs as a subprocess spawned by the Gemini CLI. Communicates via JSON-RPC 2.0 +over stdin/stdout (MCP stdio transport). Uses the platform feedback layer so +rubric scores and corrections land in Langfuse regardless of which runner is +in use — without depending on Claude-bridge internals. + +Usage (registered in .gemini/settings.json): + "command": "python", + "args": ["-m", "ambient_runner.bridges.gemini_cli.feedback_server"] +""" + +import json +import logging +import os +import sys + +from ambient_runner.platform.feedback import CORRECTION_SOURCES, CORRECTION_TYPES + +# Keep this server quiet — all output goes to stdout which is the MCP channel. +logging.basicConfig(level=logging.WARNING, stream=sys.stderr) + +# --------------------------------------------------------------------------- +# Tool definitions (MCP schema format) +# --------------------------------------------------------------------------- + +_TOOLS = [ + { + "name": "evaluate_rubric", + "description": ( + "Log a rubric evaluation score to Langfuse. " + "Read .ambient/rubric.md FIRST, evaluate the output against each " + "criterion, then call this tool with your score and reasoning." + ), + "inputSchema": { + "type": "object", + "properties": { + "score": { + "type": "number", + "description": "Overall evaluation score.", + }, + "comment": { + "type": "string", + "description": "Evaluation reasoning and commentary.", + }, + }, + "required": ["score", "comment"], + }, + }, + { + "name": "log_correction", + "description": ( + "Log a correction whenever the user redirects, corrects, or changes " + "what you did or assumed. Call this BEFORE fixing the issue. " + "Use broad judgment — if the user is steering you away from a previous " + "action or decision, log it. When in doubt, log it." + ), + "inputSchema": { + "type": "object", + "properties": { + "correction_type": { + "type": "string", + "enum": CORRECTION_TYPES, + "description": ( + "incomplete=missed something, incorrect=did the wrong thing, " + "out_of_scope=wrong files/area, style=right result wrong approach." + ), + }, + "agent_action": { + "type": "string", + "description": "What the agent did or assumed (be specific and honest).", + }, + "user_correction": { + "type": "string", + "description": "What the user said should have happened instead.", + }, + "source": { + "type": "string", + "enum": CORRECTION_SOURCES, + "description": "'human' for user corrections, 'rubric' after rubric evaluation.", + }, + }, + "required": ["correction_type", "agent_action", "user_correction"], + }, + }, +] + +# --------------------------------------------------------------------------- +# JSON-RPC helpers +# --------------------------------------------------------------------------- + + +def _send(msg: dict) -> None: + sys.stdout.write(json.dumps(msg) + "\n") + sys.stdout.flush() + + +def _respond(msg_id, result: dict) -> None: + _send({"jsonrpc": "2.0", "id": msg_id, "result": result}) + + +def _error(msg_id, code: int, message: str) -> None: + _send({"jsonrpc": "2.0", "id": msg_id, "error": {"code": code, "message": message}}) + + +# --------------------------------------------------------------------------- +# Tool handlers +# --------------------------------------------------------------------------- + +def _handle_evaluate_rubric(args: dict) -> dict: + from ambient_runner.platform.feedback import log_rubric_score + + session_id = os.getenv("AGENTIC_SESSION_NAME", "unknown") + score = args.get("score") + comment = args.get("comment", "") + + success, err = log_rubric_score(score=score, comment=comment, session_id=session_id) + if success: + return {"content": [{"type": "text", "text": f"Score {score} logged to Langfuse."}]} + return { + "content": [{"type": "text", "text": f"Failed to log score: {err}"}], + "isError": True, + } + + +def _handle_log_correction(args: dict) -> dict: + from ambient_runner.platform.feedback import log_correction + + session_id = os.getenv("AGENTIC_SESSION_NAME", "unknown") + + success, err = log_correction( + correction_type=args.get("correction_type", ""), + agent_action=args.get("agent_action", ""), + user_correction=args.get("user_correction", ""), + target_label=args.get("target", ""), + session_id=session_id, + source=args.get("source", "human"), + ) + if success: + return {"content": [{"type": "text", "text": "Correction logged successfully."}]} + return { + "content": [{"type": "text", "text": f"Failed to log correction: {err}"}], + "isError": True, + } + + +# Single source of truth: tool name → handler. Must stay in sync with _TOOLS names. +_TOOL_HANDLERS = { + "evaluate_rubric": _handle_evaluate_rubric, + "log_correction": _handle_log_correction, +} + + +def _dispatch_tool(params: dict) -> dict: + name = params.get("name", "") + handler = _TOOL_HANDLERS.get(name) + if handler is None: + return { + "content": [{"type": "text", "text": f"Unknown tool: {name}"}], + "isError": True, + } + return handler(params.get("arguments", {})) + + +# --------------------------------------------------------------------------- +# Main loop +# --------------------------------------------------------------------------- + + +def main() -> None: + for raw in sys.stdin: + raw = raw.strip() + if not raw: + continue + try: + msg = json.loads(raw) + except json.JSONDecodeError: + continue + + method = msg.get("method", "") + msg_id = msg.get("id") + params = msg.get("params") or {} + + if method == "initialize": + _respond( + msg_id, + { + "protocolVersion": "2024-11-05", + "capabilities": {"tools": {}}, + "serverInfo": {"name": "ambient-feedback", "version": "1.0.0"}, + }, + ) + elif method == "initialized": + pass # notification — no response + elif method == "tools/list": + _respond(msg_id, {"tools": _TOOLS}) + elif method == "tools/call": + _respond(msg_id, _dispatch_tool(params)) + elif method == "ping": + _respond(msg_id, {}) + elif msg_id is not None: + _error(msg_id, -32601, f"Method not found: {method}") + + +if __name__ == "__main__": + main() diff --git a/components/runners/ambient-runner/ambient_runner/bridges/gemini_cli/mcp.py b/components/runners/ambient-runner/ambient_runner/bridges/gemini_cli/mcp.py index d0d162d24..2696d9d4c 100644 --- a/components/runners/ambient-runner/ambient_runner/bridges/gemini_cli/mcp.py +++ b/components/runners/ambient-runner/ambient_runner/bridges/gemini_cli/mcp.py @@ -13,6 +13,7 @@ import json import logging +import os from pathlib import Path from typing import Optional @@ -102,11 +103,46 @@ def write_gemini_settings( return abs_path +def _build_feedback_server_entry() -> dict: + """Build the ambient-feedback MCP server entry with Langfuse credentials injected. + + The Gemini CLI strips LANGFUSE_* vars from the subprocess environment via + the blocklist in session.py (they're runner-internal secrets the AI model + shouldn't see). MCP server entries can declare their own ``env`` block which + the CLI passes directly to the server subprocess, bypassing the blocklist. + We use this to give the feedback_server the Langfuse credentials it needs. + """ + env: dict[str, str] = {} + for key in ( + "LANGFUSE_ENABLED", + "LANGFUSE_PUBLIC_KEY", + "LANGFUSE_SECRET_KEY", + "LANGFUSE_HOST", + "AGENTIC_SESSION_NAME", + "AGENTIC_SESSION_NAMESPACE", + "REPOS_JSON", + "ACTIVE_WORKFLOW_GIT_URL", + "ACTIVE_WORKFLOW_BRANCH", + "ACTIVE_WORKFLOW_PATH", + ): + val = os.environ.get(key, "") + if val: + env[key] = val + + entry: dict = { + "command": "python", + "args": ["-m", "ambient_runner.bridges.gemini_cli.feedback_server"], + } + if env: + entry["env"] = env + return entry + + def setup_gemini_mcp( context: RunnerContext, cwd_path: str, ) -> Optional[str]: - """End-to-end MCP setup: load config, write settings file. + """End-to-end MCP setup: load config, write settings file, write commands. Args: context: Runner context. @@ -115,7 +151,17 @@ def setup_gemini_mcp( Returns: Path to the written settings file, or None if no MCP servers. """ - settings = build_gemini_mcp_settings(context, cwd_path) - if settings is None: - return None - return write_gemini_settings(cwd_path, settings) + settings = build_gemini_mcp_settings(context, cwd_path) or {"mcpServers": {}} + + # Always register the ambient-feedback server so evaluate_rubric and + # log_correction tools are available to custom commands. + settings["mcpServers"]["ambient-feedback"] = _build_feedback_server_entry() + + settings_path = write_gemini_settings(cwd_path, settings) + + # Write /ambient:evaluate-rubric and /ambient:log-correction custom commands. + from ambient_runner.bridges.gemini_cli.commands import write_gemini_commands + + write_gemini_commands(cwd_path) + + return settings_path diff --git a/components/runners/ambient-runner/ambient_runner/bridges/gemini_cli/session.py b/components/runners/ambient-runner/ambient_runner/bridges/gemini_cli/session.py index 0a1094a24..cac565ca9 100644 --- a/components/runners/ambient-runner/ambient_runner/bridges/gemini_cli/session.py +++ b/components/runners/ambient-runner/ambient_runner/bridges/gemini_cli/session.py @@ -124,6 +124,18 @@ async def query( # take precedence and bypass Vertex). # See: https://geminicli.com/docs/get-started/authentication/ env["GOOGLE_GENAI_USE_VERTEXAI"] = "true" + # Map platform Vertex env vars to Gemini CLI's expected names if not + # already set. The platform uses ANTHROPIC_VERTEX_PROJECT_ID and + # CLOUD_ML_REGION for Claude; Gemini CLI needs GOOGLE_CLOUD_PROJECT + # and GOOGLE_CLOUD_LOCATION. + if not env.get("GOOGLE_CLOUD_PROJECT"): + project = env.get("ANTHROPIC_VERTEX_PROJECT_ID", "") + if project: + env["GOOGLE_CLOUD_PROJECT"] = project + if not env.get("GOOGLE_CLOUD_LOCATION"): + location = env.get("CLOUD_ML_REGION", "") + if location: + env["GOOGLE_CLOUD_LOCATION"] = location env.pop("GEMINI_API_KEY", None) env.pop("GOOGLE_API_KEY", None) elif self._api_key: diff --git a/components/runners/ambient-runner/ambient_runner/bridges/gemini_cli/system_prompt.py b/components/runners/ambient-runner/ambient_runner/bridges/gemini_cli/system_prompt.py new file mode 100644 index 000000000..23c0406a9 --- /dev/null +++ b/components/runners/ambient-runner/ambient_runner/bridges/gemini_cli/system_prompt.py @@ -0,0 +1,235 @@ +"""Write a Gemini CLI system prompt that appends to the default. + +Sets GEMINI_SYSTEM_MD=true and writes .gemini/system.md. + +Since GEMINI_SYSTEM_MD is a full replacement, we use the documented variable +substitutions to pull in Gemini's dynamically-built default content, then +append platform additions after it: + + ${SubAgents} — injects the current available sub-agents section + ${AgentSkills} — injects the current available agent skills section + ${AvailableTools} — injects the current enabled tools list + +This gives true "append" semantics without hardcoding any static default text +that could drift with Gemini CLI version updates. +""" + +import logging +import os +from pathlib import Path + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Platform-specific prompt fragments +# --------------------------------------------------------------------------- + +_GEMINI_COMMAND_HINTS = """\ +## Ambient Platform Commands + +Use these slash commands when applicable: + +- `/ambient:evaluate-rubric` — when a workflow rubric exists, run this after + completing the task to evaluate your output against the criteria and log the + score to Langfuse. +- `/ambient:log-correction` — when the user redirects or corrects something + you already did or assumed, run this BEFORE fixing the issue to log the + correction to the improvement feedback loop. + +""" + +_CORRECTION_DETECTION = """\ +## CRITICAL: Correction Logging + +**BEFORE acting on user feedback that changes something you already did or +assumed, ALWAYS ask yourself: "Is the user steering me away from a previous +action or decision?"** + +If YES → run `/ambient:log-correction` FIRST, then fix. + +**Rule**: Any message that redirects, corrects, or changes your previous work +or assumptions = log it. When in doubt, log it. This includes quality issues, +redirections, preference changes, missed requirements, wrong scope, or any +context that changes what you should have done. + +""" + +_RUBRIC_INSTRUCTIONS_TEMPLATE = """\ +## Rubric Evaluation + +This workflow includes a scoring rubric for evaluating outputs.{activation} + +**Process**: +1. Complete the requested task +2. Run `/ambient:evaluate-rubric` — it will read the rubric and guide you + through scoring your output against each criterion +3. Be honest and calibrated; provide clear reasoning for each dimension + +""" + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +def write_gemini_system_prompt(cwd_path: str) -> str: + """Write .gemini/system.md and return its absolute path. + + The file starts with Gemini's variable substitutions (${SubAgents}, + ${AgentSkills}, ${AvailableTools}) to pull in the dynamically-built + default content, then appends platform-specific context. + + Also sets os.environ["GEMINI_SYSTEM_MD"] = "true" so every Gemini CLI + subprocess spawned in this session inherits it. + + Args: + cwd_path: The session working directory (Gemini project root). + + Returns: + Absolute path to the written system.md, or empty string on failure. + """ + content = _build_system_prompt(cwd_path) + + gemini_dir = Path(cwd_path) / ".gemini" + gemini_dir.mkdir(parents=True, exist_ok=True) + system_md = gemini_dir / "system.md" + + try: + system_md.write_text(content, encoding="utf-8") + system_md.chmod(0o644) + except OSError as exc: + logger.warning("Could not write .gemini/system.md: %s", exc) + return "" + + abs_path = str(system_md.resolve()) + os.environ["GEMINI_SYSTEM_MD"] = "true" + logger.info("Wrote Gemini system prompt to %s", abs_path) + return abs_path + + +# --------------------------------------------------------------------------- +# Content builder +# --------------------------------------------------------------------------- + + +def _build_system_prompt(cwd_path: str) -> str: + """Build the full system.md content string.""" + from ambient_runner.platform.config import get_repos_config, load_ambient_config + from ambient_runner.platform.prompts import ( + GITHUB_TOKEN_PROMPT, + GITLAB_TOKEN_PROMPT, + GIT_PUSH_INSTRUCTIONS_BODY, + GIT_PUSH_INSTRUCTIONS_HEADER, + GIT_PUSH_STEPS, + MCP_INTEGRATIONS_PROMPT, + WORKSPACE_FIXED_PATHS_PROMPT, + ) + from ambient_runner.platform.utils import derive_workflow_name + + # Pull in Gemini's dynamically-built default sections via variable substitution. + # These are expanded at runtime by the CLI — no static text to maintain. + sections = [ + "${SubAgents}", + "", + "${AgentSkills}", + "", + "${AvailableTools}", + "", + "---", + "", + "# Ambient Code Platform", + "", + ] + + # ---- Workspace paths ---- + sections.append("## Workspace Structure\n") + sections.append(WORKSPACE_FIXED_PATHS_PROMPT) + + # ---- Repos + git push instructions ---- + repos_cfg = get_repos_config() + active_workflow_url = (os.getenv("ACTIVE_WORKFLOW_GIT_URL") or "").strip() + session_id = os.getenv("AGENTIC_SESSION_NAME", "").strip() + feature_branch = f"ambient/{session_id}" if session_id else None + + if repos_cfg: + repo_names = [r.get("name", f"repo-{i}") for i, r in enumerate(repos_cfg)] + display = [f"repos/{n}/" for n in repo_names] + if len(display) <= 5: + sections.append(f"**Repositories**: {', '.join(display)}\n") + else: + sections.append( + f"**Repositories** ({len(display)} total): " + f"{', '.join(display[:5])}, and {len(display) - 5} more\n" + ) + if feature_branch: + sections.append(f"**Working Branch**: `{feature_branch}`\n") + sections.append("") + + auto_push = [r for r in repos_cfg if r.get("autoPush", False)] + if auto_push: + branch = feature_branch or "ambient/" + sections.append(GIT_PUSH_INSTRUCTIONS_HEADER) + sections.append(GIT_PUSH_INSTRUCTIONS_BODY) + for r in auto_push: + sections.append(f"- **repos/{r.get('name', 'unknown')}/**") + sections.append(GIT_PUSH_STEPS.format(branch=branch)) + + # ---- Workflow directory ---- + if active_workflow_url: + workflow_name = derive_workflow_name(active_workflow_url) + if workflow_name: + sections.append( + f"**Working Directory**: `workflows/{workflow_name}/` " + "(workflow logic — do not create files here)\n" + ) + + # ---- File uploads ---- + uploads = Path(os.getenv("WORKSPACE_PATH", "/workspace")) / "file-uploads" + if uploads.exists(): + try: + files = sorted(f.name for f in uploads.iterdir() if f.is_file()) + if files: + if len(files) <= 10: + sections.append(f"**Uploaded Files**: {', '.join(files)}\n") + else: + sections.append( + f"**Uploaded Files** ({len(files)} total): " + f"{', '.join(files[:10])}, and {len(files) - 10} more\n" + ) + except Exception as exc: + logger.warning("Could not list uploaded files in %s: %s", uploads, exc) + + # ---- MCP integration hints ---- + sections.append(MCP_INTEGRATIONS_PROMPT) + + # ---- Token visibility ---- + if os.getenv("GITHUB_TOKEN"): + sections.append(GITHUB_TOKEN_PROMPT) + if os.getenv("GITLAB_TOKEN"): + sections.append(GITLAB_TOKEN_PROMPT) + + # ---- Workflow custom instructions ---- + ambient_config: dict = {} + if active_workflow_url: + ambient_config = load_ambient_config(cwd_path) or {} + if ambient_config.get("systemPrompt"): + sections.append(f"## Workflow Instructions\n\n{ambient_config['systemPrompt']}\n") + + # ---- Rubric instructions (when rubric config exists) ---- + rubric_config = ambient_config.get("rubric", {}) + if rubric_config: + activation = rubric_config.get("activationPrompt", "") + activation_str = f" {activation}" if activation else "" + sections.append(_RUBRIC_INSTRUCTIONS_TEMPLATE.format(activation=activation_str)) + + # ---- Custom command hints (always included) ---- + sections.append(_GEMINI_COMMAND_HINTS) + + # ---- Corrections (when Langfuse enabled) ---- + from ambient_runner.observability import is_langfuse_enabled + + if is_langfuse_enabled(): + sections.append(_CORRECTION_DETECTION) + + return "\n".join(sections) diff --git a/components/runners/ambient-runner/ambient_runner/platform/auth.py b/components/runners/ambient-runner/ambient_runner/platform/auth.py index bd60b4ae7..1d6f2ae39 100644 --- a/components/runners/ambient-runner/ambient_runner/platform/auth.py +++ b/components/runners/ambient-runner/ambient_runner/platform/auth.py @@ -22,6 +22,36 @@ _PLACEHOLDER_EMAIL = "user@example.com" +# --------------------------------------------------------------------------- +# Vertex AI credential validation (shared across all bridges) +# --------------------------------------------------------------------------- + + +def validate_vertex_credentials_file(context: RunnerContext) -> str: + """Validate that GOOGLE_APPLICATION_CREDENTIALS is set and the file exists. + + Shared by all bridge auth modules so the check and error messages are + consistent regardless of which runner is in use. + + Args: + context: Runner context used to resolve the env var. + + Returns: + The resolved credentials file path. + + Raises: + RuntimeError: If the env var is unset or the file does not exist. + """ + path = context.get_env("GOOGLE_APPLICATION_CREDENTIALS", "").strip() + if not path: + raise RuntimeError( + "GOOGLE_APPLICATION_CREDENTIALS must be set when USE_VERTEX is enabled" + ) + if not Path(path).exists(): + raise RuntimeError(f"Service account key file not found at {path}") + return path + + # --------------------------------------------------------------------------- # User context sanitization # --------------------------------------------------------------------------- diff --git a/components/runners/ambient-runner/ambient_runner/platform/feedback.py b/components/runners/ambient-runner/ambient_runner/platform/feedback.py new file mode 100644 index 000000000..7c7061805 --- /dev/null +++ b/components/runners/ambient-runner/ambient_runner/platform/feedback.py @@ -0,0 +1,105 @@ +"""Platform-level feedback utilities shared across all bridges. + +Provides public wrappers around Langfuse score logging so that any +bridge (Claude, Gemini, future) can log rubric evaluations and +corrections without depending on Claude-bridge internals. +""" + +from ambient_runner.bridges.claude.corrections import ( + CORRECTION_SOURCES, + CORRECTION_TYPES, + _get_session_context, + _log_correction_to_langfuse, + build_target_map, +) +from ambient_runner.bridges.claude.tools import _log_to_langfuse + +__all__ = [ + "CORRECTION_SOURCES", + "CORRECTION_TYPES", + "log_rubric_score", + "log_correction", + "get_session_context", + "build_target_map", +] + +# Module-level cache for get_session_context() — the function runs subprocess +# git commands that are slow and stable within a single process lifetime. +_session_context_cache: dict | None = None + + +def get_session_context() -> dict: + """Return auto-captured session context (repos, workflow, session name). + + Result is cached for the process lifetime since repos and workflow info + don't change mid-session. Reads from environment variables and falls back + to workspace filesystem scanning when REPOS_JSON is not set. + """ + global _session_context_cache + if _session_context_cache is None: + _session_context_cache = _get_session_context() + return _session_context_cache + + +def log_rubric_score( + score: float | None, + comment: str, + session_id: str, + obs=None, + metadata=None, +) -> tuple[bool, str | None]: + """Log a rubric evaluation score to Langfuse. + + Args: + score: Numeric evaluation score. + comment: Evaluation reasoning. + session_id: Current session ID. + obs: Optional ObservabilityManager for trace correlation. + metadata: Optional schema-driven metadata dict. + + Returns: + (success, error_message) tuple. + """ + return _log_to_langfuse( + score=score, + comment=comment, + metadata=metadata, + obs=obs, + session_id=session_id, + ) + + +def log_correction( + correction_type: str, + agent_action: str, + user_correction: str, + session_id: str, + target_label: str = "", + obs=None, + source: str = "human", +) -> tuple[bool, str | None]: + """Log a correction to Langfuse for the improvement feedback loop. + + Args: + correction_type: One of CORRECTION_TYPES. + agent_action: What the agent did or assumed. + user_correction: What the user said should have happened instead. + session_id: Current session ID. + target_label: Optional target label resolved against the target map. + obs: Optional ObservabilityManager for trace correlation. + source: One of CORRECTION_SOURCES ('human' or 'rubric'). + + Returns: + (success, error_message) tuple. + """ + target_map = build_target_map(get_session_context()) + return _log_correction_to_langfuse( + correction_type=correction_type, + agent_action=agent_action, + user_correction=user_correction, + target_label=target_label, + target_map=target_map, + obs=obs, + session_id=session_id, + source=source, + )