Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions .github/workflows/components-build-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -252,13 +252,13 @@ jobs:
if [ "${{ needs.detect-changes.outputs.claude-runner }}" == "true" ]; then
echo "runner_tag=${{ github.sha }}" >> $GITHUB_OUTPUT
else
echo "runner_tag=stage" >> $GITHUB_OUTPUT
echo "runner_tag=latest" >> $GITHUB_OUTPUT
fi

if [ "${{ needs.detect-changes.outputs.state-sync }}" == "true" ]; then
echo "state_sync_tag=${{ github.sha }}" >> $GITHUB_OUTPUT
else
echo "state_sync_tag=stage" >> $GITHUB_OUTPUT
echo "state_sync_tag=latest" >> $GITHUB_OUTPUT
fi

if [ "${{ needs.detect-changes.outputs.public-api }}" == "true" ]; then
Expand Down Expand Up @@ -303,6 +303,27 @@ jobs:
AMBIENT_CODE_RUNNER_IMAGE="quay.io/ambient_code/vteam_claude_runner:${{ steps.image-tags.outputs.runner_tag }}" \
STATE_SYNC_IMAGE="quay.io/ambient_code/vteam_state_sync:${{ steps.image-tags.outputs.state_sync_tag }}"

- name: Update agent registry ConfigMap with pinned image tags
if: needs.detect-changes.outputs.claude-runner == 'true' || needs.detect-changes.outputs.state-sync == 'true'
run: |
# Fetch live JSON from cluster so unchanged images keep their current tag
REGISTRY=$(oc get configmap ambient-agent-registry -n ambient-code \
-o jsonpath='{.data.agent-registry\.json}')

# Only replace images that were actually rebuilt this run.
# Pattern [@:][^"]* matches both :tag and @sha256:digest refs.
if [ "${{ needs.detect-changes.outputs.claude-runner }}" == "true" ]; then
REGISTRY=$(echo "$REGISTRY" | sed \
"s|quay.io/ambient_code/vteam_claude_runner[@:][^\"]*|quay.io/ambient_code/vteam_claude_runner:${{ github.sha }}|g")
fi
if [ "${{ needs.detect-changes.outputs.state-sync }}" == "true" ]; then
REGISTRY=$(echo "$REGISTRY" | sed \
"s|quay.io/ambient_code/vteam_state_sync[@:][^\"]*|quay.io/ambient_code/vteam_state_sync:${{ github.sha }}|g")
fi

oc patch configmap ambient-agent-registry -n ambient-code --type=merge \
-p "{\"data\":{\"agent-registry.json\":$(echo "$REGISTRY" | jq -Rs .)}}"

deploy-with-disptach:
runs-on: ubuntu-latest
needs: [detect-changes, build-and-push, update-rbac-and-crd]
Expand Down Expand Up @@ -357,5 +378,15 @@ jobs:
- name: Update operator environment variables
run: |
oc set env deployment/agentic-operator -n ambient-code -c agentic-operator \
AMBIENT_CODE_RUNNER_IMAGE="quay.io/ambient_code/vteam_claude_runner:stage" \
STATE_SYNC_IMAGE="quay.io/ambient_code/vteam_state_sync:stage"
AMBIENT_CODE_RUNNER_IMAGE="quay.io/ambient_code/vteam_claude_runner:${{ github.sha }}" \
STATE_SYNC_IMAGE="quay.io/ambient_code/vteam_state_sync:${{ github.sha }}"

- name: Update agent registry ConfigMap with pinned image tags
run: |
REGISTRY=$(oc get configmap ambient-agent-registry -n ambient-code \
-o jsonpath='{.data.agent-registry\.json}')
REGISTRY=$(echo "$REGISTRY" | sed \
-e "s|quay.io/ambient_code/vteam_claude_runner[@:][^\"]*|quay.io/ambient_code/vteam_claude_runner:${{ github.sha }}|g" \
-e "s|quay.io/ambient_code/vteam_state_sync[@:][^\"]*|quay.io/ambient_code/vteam_state_sync:${{ github.sha }}|g")
oc patch configmap ambient-agent-registry -n ambient-code --type=merge \
-p "{\"data\":{\"agent-registry.json\":$(echo "$REGISTRY" | jq -Rs .)}}"
15 changes: 15 additions & 0 deletions .github/workflows/prod-release-deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ on:
required: false
type: string
default: ''
concurrency:
group: prod-release-deploy
cancel-in-progress: false

jobs:
release:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -273,3 +277,14 @@ jobs:
oc set env deployment/agentic-operator -n ambient-code -c agentic-operator \
AMBIENT_CODE_RUNNER_IMAGE="quay.io/ambient_code/vteam_claude_runner:${{ needs.release.outputs.new_tag }}" \
STATE_SYNC_IMAGE="quay.io/ambient_code/vteam_state_sync:${{ needs.release.outputs.new_tag }}"

- name: Update agent registry ConfigMap with release image tags
run: |
RELEASE_TAG="${{ needs.release.outputs.new_tag }}"
REGISTRY=$(oc get configmap ambient-agent-registry -n ambient-code \
-o jsonpath='{.data.agent-registry\.json}')
REGISTRY=$(echo "$REGISTRY" | sed \
-e "s|quay.io/ambient_code/vteam_claude_runner[@:][^\"]*|quay.io/ambient_code/vteam_claude_runner:${RELEASE_TAG}|g" \
-e "s|quay.io/ambient_code/vteam_state_sync[@:][^\"]*|quay.io/ambient_code/vteam_state_sync:${RELEASE_TAG}|g")
oc patch configmap ambient-agent-registry -n ambient-code --type=merge \
-p "{\"data\":{\"agent-registry.json\":$(echo "$REGISTRY" | jq -Rs .)}}"
65 changes: 62 additions & 3 deletions components/runners/ambient-runner/ambient_runner/bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ async def interrupt(self, thread_id=None) -> None:
pass
"""

import asyncio
import logging
import os
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, AsyncIterator, Optional
Expand All @@ -43,6 +45,30 @@ async def interrupt(self, thread_id=None) -> None:
TOOL_REFRESH_MIN_INTERVAL_SEC = 30


def _async_safe_manager_shutdown(manager: Any) -> None:
"""Fire-and-forget async shutdown of a session manager from sync context.

Used by ``mark_dirty()`` implementations in all bridges. Handles both
cases: called from within a running event loop (schedules as a task)
and called outside any loop (blocks via ``asyncio.run``).
"""
try:
loop = asyncio.get_running_loop()
task = loop.create_task(manager.shutdown())
task.add_done_callback(
lambda f: _bridge_logger.warning(
"mark_dirty: session_manager shutdown error: %s", f.exception()
)
if f.exception()
else None
)
except RuntimeError:
try:
asyncio.run(manager.shutdown())
except Exception as exc:
_bridge_logger.warning("mark_dirty: session_manager shutdown error: %s", exc)


@dataclass
class FrameworkCapabilities:
"""Declares what a framework adapter supports.
Expand Down Expand Up @@ -77,6 +103,11 @@ class PlatformBridge(ABC):
- ``get_error_context()`` — returns extra error info for failed runs
"""

def __init__(self) -> None:
self._context: Optional[RunnerContext] = None
self._ready: bool = False
self._last_creds_refresh: float = 0.0

# ------------------------------------------------------------------
# Required (abstract)
# ------------------------------------------------------------------
Expand Down Expand Up @@ -116,11 +147,39 @@ async def interrupt(self, thread_id: Optional[str] = None) -> None:
# ------------------------------------------------------------------

def set_context(self, context: RunnerContext) -> None:
"""Store the runner context for later use.
"""Store the runner context (called from lifespan before any requests)."""
self._context = context

Called by the platform lifespan before any requests are served.
Override to capture the context for use in ``run()``.
async def _refresh_credentials_if_stale(self) -> None:
"""Refresh platform credentials if the refresh interval has elapsed.

Call this at the start of each ``run()`` to keep tokens fresh.
"""
now = time.monotonic()
if now - self._last_creds_refresh > CREDS_REFRESH_INTERVAL_SEC:
from ambient_runner.platform.auth import populate_runtime_credentials

await populate_runtime_credentials(self._context)
self._last_creds_refresh = now

async def _ensure_ready(self) -> None:
"""Run one-time platform setup on the first ``run()`` call.

Calls ``_setup_platform()`` the first time, then sets ``self._ready``.
"""
if self._ready:
return
if not self._context:
raise RuntimeError("Context not set — call set_context() first")
await self._setup_platform()
self._ready = True
_bridge_logger.info(
"Platform ready — model: %s",
getattr(self, "_configured_model", ""),
)

async def _setup_platform(self) -> None:
"""Framework-specific platform setup. Override in each bridge."""
pass

async def shutdown(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

import logging
import os
from pathlib import Path

from ambient_runner.platform.auth import validate_vertex_credentials_file
from ambient_runner.platform.context import RunnerContext
from ambient_runner.platform.utils import is_vertex_enabled

Expand Down Expand Up @@ -38,26 +38,17 @@ def map_to_vertex_model(model: str) -> str:

async def setup_vertex_credentials(context: RunnerContext) -> dict:
"""Set up Google Cloud Vertex AI credentials from service account."""
service_account_path = context.get_env("GOOGLE_APPLICATION_CREDENTIALS", "").strip()
service_account_path = validate_vertex_credentials_file(context)
project_id = context.get_env("ANTHROPIC_VERTEX_PROJECT_ID", "").strip()
region = context.get_env("CLOUD_ML_REGION", "").strip()

if not service_account_path:
raise RuntimeError(
"GOOGLE_APPLICATION_CREDENTIALS must be set when USE_VERTEX is enabled"
)
if not project_id:
raise RuntimeError(
"ANTHROPIC_VERTEX_PROJECT_ID must be set when USE_VERTEX is enabled"
)
if not region:
raise RuntimeError("CLOUD_ML_REGION must be set when USE_VERTEX is enabled")

if not Path(service_account_path).exists():
raise RuntimeError(
f"Service account key file not found at {service_account_path}"
)

logger.info(f"Vertex AI configured: project={project_id}, region={region}")
return {
"credentials_path": service_account_path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
- Interrupt and graceful shutdown
"""

import asyncio
import logging
import os
import time
Expand All @@ -19,9 +18,9 @@
from ag_ui_claude_sdk import ClaudeAgentAdapter

from ambient_runner.bridge import (
CREDS_REFRESH_INTERVAL_SEC,
FrameworkCapabilities,
PlatformBridge,
_async_safe_manager_shutdown,
setup_bridge_observability,
)
from ambient_runner.bridges.claude.session import SessionManager
Expand All @@ -43,13 +42,12 @@ class ClaudeBridge(PlatformBridge):
"""

def __init__(self) -> None:
super().__init__()
self._adapter: ClaudeAgentAdapter | None = None
self._session_manager: SessionManager | None = None
self._obs: Any = None
self._context: RunnerContext | None = None

# Platform state (populated by _setup_platform)
self._ready: bool = False
self._first_run: bool = True
self._configured_model: str = ""
self._cwd_path: str = ""
Expand All @@ -58,7 +56,6 @@ def __init__(self) -> None:
self._allowed_tools: list[str] = []
self._system_prompt: dict = {}
self._stderr_lines: list[str] = []
self._last_creds_refresh: float = 0.0

# ------------------------------------------------------------------
# PlatformBridge interface
Expand Down Expand Up @@ -89,14 +86,7 @@ async def run(self, input_data: RunAgentInput) -> AsyncIterator[BaseEvent]:
"""Full run lifecycle: lazy setup → adapter → session worker → tracing."""
# 1. Lazy platform setup
await self._ensure_ready()

# Refresh credentials if stale (tokens may have expired)
now = time.monotonic()
if now - self._last_creds_refresh > CREDS_REFRESH_INTERVAL_SEC:
from ambient_runner.platform.auth import populate_runtime_credentials

await populate_runtime_credentials(self._context)
self._last_creds_refresh = now
await self._refresh_credentials_if_stale()

# 2. Ensure adapter exists
self._ensure_adapter()
Expand Down Expand Up @@ -153,10 +143,6 @@ async def interrupt(self, thread_id: Optional[str] = None) -> None:
# Lifecycle methods
# ------------------------------------------------------------------

def set_context(self, context: RunnerContext) -> None:
"""Store the runner context (called from lifespan)."""
self._context = context

async def shutdown(self) -> None:
"""Graceful shutdown: persist sessions, finalise tracing."""
if self._session_manager:
Expand All @@ -179,22 +165,7 @@ def mark_dirty(self) -> None:
if self._session_manager:
manager = self._session_manager
self._session_manager = None
try:
asyncio.get_running_loop() # raises RuntimeError if no loop
future = asyncio.ensure_future(manager.shutdown())
future.add_done_callback(
lambda f: logger.warning(
"mark_dirty: session_manager shutdown error: %s", f.exception()
)
if f.exception()
else None
)
except RuntimeError:
# No running loop — safe to block
try:
asyncio.run(manager.shutdown())
except Exception as e:
logger.warning("mark_dirty: session_manager shutdown error: %s", e)
_async_safe_manager_shutdown(manager)
logger.info("ClaudeBridge: marked dirty — will reinitialise on next run")

def get_error_context(self) -> str:
Expand Down Expand Up @@ -300,18 +271,6 @@ def session_manager(self) -> SessionManager | None:
# Private: platform setup (lazy, called on first run)
# ------------------------------------------------------------------

async def _ensure_ready(self) -> None:
"""Run one-time platform setup if not already done."""
if self._ready:
return
if not self._context:
raise RuntimeError("Context not set — call set_context() first")
await self._setup_platform()
self._ready = True
logger.info(
f"Platform ready — model: {self._configured_model}, cwd: {self._cwd_path}"
)

async def _setup_platform(self) -> None:
"""Full platform setup: auth, workspace, MCP, observability."""
# Session manager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging

from ambient_runner.platform.auth import validate_vertex_credentials_file
from ambient_runner.platform.context import RunnerContext
from ambient_runner.platform.utils import is_vertex_enabled

Expand All @@ -26,8 +27,19 @@ async def setup_gemini_cli_auth(context: RunnerContext) -> tuple[str, str, bool]
use_vertex = is_vertex_enabled(legacy_var="GEMINI_USE_VERTEX", context=context)

if use_vertex:
project = context.get_env("GOOGLE_CLOUD_PROJECT", "").strip()
location = context.get_env("GOOGLE_CLOUD_LOCATION", "").strip()
validate_vertex_credentials_file(context)

# Resolve project/location — may be set directly or via platform aliases
# (ANTHROPIC_VERTEX_PROJECT_ID / CLOUD_ML_REGION). The subprocess mapping
# in session.py handles the latter; here we just log what we found.
project = (
context.get_env("GOOGLE_CLOUD_PROJECT", "").strip()
or context.get_env("ANTHROPIC_VERTEX_PROJECT_ID", "").strip()
)
location = (
context.get_env("GOOGLE_CLOUD_LOCATION", "").strip()
or context.get_env("CLOUD_ML_REGION", "").strip()
)

logger.info(
"Gemini CLI: Vertex AI mode (project=%s, location=%s, model=%s)",
Expand Down
Loading
Loading