From 8825fd94ab49a3ae0f88c4e0055263bbfe360001 Mon Sep 17 00:00:00 2001 From: Tim Williams Date: Fri, 6 Mar 2026 15:46:50 -0500 Subject: [PATCH 1/8] docs(public-api): add OpenAPI spec and proposal for session control endpoints Captures the planned v1 API surface including session CRUD, messaging, output retrieval, runs, start/stop, and interrupt. --- components/public-api/openapi.yaml | 757 ++++++++++++++++++ .../proposals/public-api-session-control.md | 79 ++ 2 files changed, 836 insertions(+) create mode 100644 components/public-api/openapi.yaml create mode 100644 docs/internal/proposals/public-api-session-control.md diff --git a/components/public-api/openapi.yaml b/components/public-api/openapi.yaml new file mode 100644 index 000000000..3de09be21 --- /dev/null +++ b/components/public-api/openapi.yaml @@ -0,0 +1,757 @@ +openapi: 3.0.3 +info: + title: Ambient Code Public API + description: Simplified, versioned REST API gateway for the Ambient Code Platform. + version: 1.0.0 +servers: + - url: https://public-api-ambient-code.apps.okd1.timslab/v1 + description: Lab OKD cluster + - url: http://localhost:8081/v1 + description: Local development + +security: + - BearerAuth: [] + +paths: + /sessions: + get: + summary: List sessions + operationId: listSessions + parameters: + - $ref: '#/components/parameters/ProjectHeader' + responses: + '200': + description: List of sessions + content: + application/json: + schema: + $ref: '#/components/schemas/SessionListResponse' + '400': + $ref: '#/components/responses/BadRequest' + '401': + $ref: '#/components/responses/Unauthorized' + '502': + $ref: '#/components/responses/BadGateway' + + post: + summary: Create a session + operationId: createSession + parameters: + - $ref: '#/components/parameters/ProjectHeader' + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/CreateSessionRequest' + responses: + '201': + description: Session created + content: + application/json: + schema: + type: object + properties: + id: + type: string + example: session-abc123 + message: + type: string + example: Session created + '400': + $ref: '#/components/responses/BadRequest' + '401': + $ref: '#/components/responses/Unauthorized' + '502': + $ref: '#/components/responses/BadGateway' + + /sessions/{id}: + get: + summary: Get session details + operationId: getSession + parameters: + - $ref: '#/components/parameters/ProjectHeader' + - $ref: '#/components/parameters/SessionID' + responses: + '200': + description: Session details + content: + application/json: + schema: + $ref: '#/components/schemas/SessionResponse' + '400': + $ref: '#/components/responses/BadRequest' + '401': + $ref: '#/components/responses/Unauthorized' + '404': + $ref: '#/components/responses/NotFound' + '502': + $ref: '#/components/responses/BadGateway' + + delete: + summary: Delete a session + operationId: deleteSession + parameters: + - $ref: '#/components/parameters/ProjectHeader' + - $ref: '#/components/parameters/SessionID' + responses: + '204': + description: Session deleted + '400': + $ref: '#/components/responses/BadRequest' + '401': + $ref: '#/components/responses/Unauthorized' + '404': + $ref: '#/components/responses/NotFound' + '502': + $ref: '#/components/responses/BadGateway' + + /sessions/{id}/message: + post: + summary: Send a message to a session + description: > + Sends a user message to the session. The public API handles all + AG-UI protocol details (run ID generation, message envelope + construction, thread mapping). The resulting run executes + asynchronously; use GET /sessions/{id}/runs to track progress + and GET /sessions/{id}/output to retrieve results. + operationId: sendMessage + parameters: + - $ref: '#/components/parameters/ProjectHeader' + - $ref: '#/components/parameters/SessionID' + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/SendMessageRequest' + responses: + '202': + description: Message accepted, run created + content: + application/json: + schema: + $ref: '#/components/schemas/SendMessageResponse' + '400': + $ref: '#/components/responses/BadRequest' + '401': + $ref: '#/components/responses/Unauthorized' + '502': + $ref: '#/components/responses/BadGateway' + + /sessions/{id}/output: + get: + summary: Get session output + description: > + Returns session output in one of three formats controlled by the + `format` query parameter. + + + - **transcript** (default) — Assembled conversation messages extracted + from MESSAGES_SNAPSHOT events. Each message has a role, full content, + and optional tool calls. This is the smallest and most useful format + for most consumers. + + - **compact** — AG-UI events with streaming deltas merged. All + TEXT_MESSAGE_CONTENT deltas for the same message are concatenated + into a single event, and likewise for TOOL_CALL_ARGS. Significantly + smaller than raw events while preserving full event structure. + + - **events** — Raw AG-UI events exactly as persisted. Includes every + individual streaming delta. Can be very large even for short sessions. + + + All formats support optional `run_id` filtering. + operationId: getSessionOutput + parameters: + - $ref: '#/components/parameters/ProjectHeader' + - $ref: '#/components/parameters/SessionID' + - name: format + in: query + required: false + description: Output format + schema: + type: string + enum: [transcript, compact, events] + default: transcript + - name: run_id + in: query + required: false + description: Filter output to a specific run (must be a valid UUID) + schema: + type: string + format: uuid + responses: + '200': + description: > + Session output. Response schema depends on the `format` parameter. + content: + application/json: + schema: + oneOf: + - $ref: '#/components/schemas/TranscriptOutputResponse' + - $ref: '#/components/schemas/EventsOutputResponse' + discriminator: + propertyName: format + mapping: + transcript: '#/components/schemas/TranscriptOutputResponse' + compact: '#/components/schemas/EventsOutputResponse' + events: '#/components/schemas/EventsOutputResponse' + '400': + $ref: '#/components/responses/BadRequest' + '401': + $ref: '#/components/responses/Unauthorized' + '404': + description: Session or run not found + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '502': + $ref: '#/components/responses/BadGateway' + + /sessions/{id}/runs: + post: + summary: Create a run + description: > + Creates a new AG-UI run with full control over the run input. + The caller provides the complete RunAgentInput payload including + run ID, thread ID, and messages. For a simpler interface that + handles AG-UI details automatically, use POST /sessions/{id}/message. + operationId: createRun + parameters: + - $ref: '#/components/parameters/ProjectHeader' + - $ref: '#/components/parameters/SessionID' + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/CreateRunRequest' + responses: + '202': + description: Run accepted + content: + application/json: + schema: + $ref: '#/components/schemas/CreateRunResponse' + '400': + $ref: '#/components/responses/BadRequest' + '401': + $ref: '#/components/responses/Unauthorized' + '502': + $ref: '#/components/responses/BadGateway' + + get: + summary: List runs in a session + description: > + Returns a summary of all AG-UI runs in the session, including + status, timestamps, event counts, and the originating user message. + operationId: getSessionRuns + parameters: + - $ref: '#/components/parameters/ProjectHeader' + - $ref: '#/components/parameters/SessionID' + responses: + '200': + description: List of runs + content: + application/json: + schema: + $ref: '#/components/schemas/SessionRunsResponse' + '400': + $ref: '#/components/responses/BadRequest' + '401': + $ref: '#/components/responses/Unauthorized' + '404': + $ref: '#/components/responses/NotFound' + '502': + $ref: '#/components/responses/BadGateway' + + /sessions/{id}/start: + post: + summary: Start (resume) a session + description: > + Resumes a stopped or completed session. The session transitions + back to Running state and can accept new messages. Returns 409 + if the session is already running or pending. + operationId: startSession + parameters: + - $ref: '#/components/parameters/ProjectHeader' + - $ref: '#/components/parameters/SessionID' + responses: + '202': + description: Start accepted + content: + application/json: + schema: + $ref: '#/components/schemas/SessionResponse' + '400': + $ref: '#/components/responses/BadRequest' + '401': + $ref: '#/components/responses/Unauthorized' + '404': + $ref: '#/components/responses/NotFound' + '422': + description: Session is already running or pending + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '502': + $ref: '#/components/responses/BadGateway' + + /sessions/{id}/stop: + post: + summary: Stop a session + description: > + Stops a running session. The session's pod is terminated and the + session transitions to a completed state. This is a session-level + lifecycle action — use interrupt instead to cancel only the + current run without killing the session. Returns 409 if the + session is already stopped, completed, or failed. + operationId: stopSession + parameters: + - $ref: '#/components/parameters/ProjectHeader' + - $ref: '#/components/parameters/SessionID' + responses: + '202': + description: Stop accepted + content: + application/json: + schema: + $ref: '#/components/schemas/SessionResponse' + '400': + $ref: '#/components/responses/BadRequest' + '401': + $ref: '#/components/responses/Unauthorized' + '404': + $ref: '#/components/responses/NotFound' + '422': + description: Session is not in a running state + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '502': + $ref: '#/components/responses/BadGateway' + + /sessions/{id}/interrupt: + post: + summary: Interrupt the current run + description: > + Sends an interrupt signal to cancel the agent's current execution + without killing the session. The session remains in Running state + and can accept new messages. Equivalent to the red "Stop" button + in the chat UI. + operationId: interruptSession + parameters: + - $ref: '#/components/parameters/ProjectHeader' + - $ref: '#/components/parameters/SessionID' + responses: + '200': + description: Interrupt signal sent + content: + application/json: + schema: + $ref: '#/components/schemas/MessageResponse' + '400': + $ref: '#/components/responses/BadRequest' + '401': + $ref: '#/components/responses/Unauthorized' + '404': + $ref: '#/components/responses/NotFound' + '422': + description: Session not in a state that can be interrupted + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '502': + $ref: '#/components/responses/BadGateway' + +components: + securitySchemes: + BearerAuth: + type: http + scheme: bearer + description: OpenShift token or access key + + parameters: + ProjectHeader: + name: X-Ambient-Project + in: header + required: true + description: Target project/namespace + schema: + type: string + pattern: '^[a-z0-9][a-z0-9-]*[a-z0-9]$' + example: my-project + + SessionID: + name: id + in: path + required: true + description: Session ID (valid Kubernetes name) + schema: + type: string + pattern: '^[a-z0-9][a-z0-9-]*[a-z0-9]$' + example: session-abc123 + + schemas: + SessionResponse: + type: object + properties: + id: + type: string + status: + type: string + enum: [pending, running, completed, failed] + display_name: + type: string + task: + type: string + model: + type: string + repos: + type: array + description: Repositories configured for this session + items: + $ref: '#/components/schemas/SessionRepo' + createdAt: + type: string + format: date-time + completedAt: + type: string + format: date-time + result: + type: string + error: + type: string + + SessionListResponse: + type: object + properties: + items: + type: array + items: + $ref: '#/components/schemas/SessionResponse' + total: + type: integer + + CreateSessionRequest: + type: object + required: [task] + properties: + task: + type: string + description: The initial prompt / task for the agent + display_name: + type: string + description: Human-readable name for the session + model: + type: string + repos: + type: array + items: + type: object + required: [url] + properties: + url: + type: string + branch: + type: string + + CreateRunRequest: + type: object + required: [messages] + properties: + run_id: + type: string + format: uuid + description: Caller-provided run ID. Generated server-side if omitted. + thread_id: + type: string + description: AG-UI thread ID. Defaults to the session ID if omitted. + messages: + type: array + minItems: 1 + items: + $ref: '#/components/schemas/RunMessage' + + CreateRunResponse: + type: object + properties: + run_id: + type: string + format: uuid + thread_id: + type: string + + RunMessage: + type: object + required: [role, content] + properties: + id: + type: string + format: uuid + description: Message ID. Generated server-side if omitted. + role: + type: string + enum: [user, assistant, system] + content: + type: string + + SendMessageRequest: + type: object + required: [content] + properties: + content: + type: string + description: The user message to send to the agent + + SendMessageResponse: + type: object + properties: + run_id: + type: string + format: uuid + description: UUID identifying this run + thread_id: + type: string + description: Session ID (same as the path parameter) + + TranscriptOutputResponse: + type: object + description: > + Transcript format (default). Returns assembled conversation messages + extracted from MESSAGES_SNAPSHOT events emitted at the end of each run. + properties: + session_id: + type: string + format: + type: string + enum: [transcript] + messages: + type: array + description: Assembled conversation messages in chronological order + items: + $ref: '#/components/schemas/TranscriptMessage' + + TranscriptMessage: + type: object + description: > + A single assembled message from the conversation. Extracted from + MESSAGES_SNAPSHOT events which the runner emits at the end of each + run containing the full conversation up to that point. + properties: + id: + type: string + description: Message ID + role: + type: string + enum: [user, assistant, system, tool, developer] + content: + type: string + description: Full message content + tool_calls: + type: array + description: Tool calls made by the assistant (present when role is assistant) + items: + $ref: '#/components/schemas/TranscriptToolCall' + tool_call_id: + type: string + description: ID of the tool call this message is a response to (present when role is tool) + name: + type: string + description: Tool name (present when role is tool) + timestamp: + type: string + + TranscriptToolCall: + type: object + properties: + id: + type: string + name: + type: string + args: + type: string + description: JSON-encoded tool call arguments + result: + type: string + description: Tool call result (if completed) + status: + type: string + enum: [pending, running, completed, error] + duration: + type: integer + format: int64 + description: Execution time in milliseconds + + EventsOutputResponse: + type: object + description: > + Events format. Used for both `compact` and `events` formats. + In compact mode, streaming deltas (TEXT_MESSAGE_CONTENT, + TOOL_CALL_ARGS) are merged per message/tool call. In events + mode, raw events are returned as persisted. + properties: + session_id: + type: string + format: + type: string + enum: [compact, events] + events: + type: array + description: AG-UI protocol events in emission order + items: + $ref: '#/components/schemas/AGUIEvent' + + AGUIEvent: + type: object + description: > + A single AG-UI protocol event. The `type` field determines which + additional fields are present. All events carry `type` and `runId`; + other fields depend on the event type. + required: [type] + properties: + type: + type: string + enum: + - RUN_STARTED + - RUN_FINISHED + - RUN_ERROR + - TEXT_MESSAGE_START + - TEXT_MESSAGE_CONTENT + - TEXT_MESSAGE_END + - TOOL_CALL_START + - TOOL_CALL_ARGS + - TOOL_CALL_END + - STEP_STARTED + - STEP_FINISHED + - STATE_DELTA + - CUSTOM + description: AG-UI event type + runId: + type: string + format: uuid + description: Run this event belongs to + threadId: + type: string + description: Thread (session) this event belongs to + timestamp: + type: integer + format: int64 + description: Unix timestamp in milliseconds + messageId: + type: string + description: Message ID (present on TEXT_MESSAGE_* events) + role: + type: string + enum: [user, assistant, system] + description: Message role (present on TEXT_MESSAGE_START) + content: + type: string + description: Full message content (present on TEXT_MESSAGE_START for user messages) + delta: + type: string + description: > + Incremental content chunk (present on TEXT_MESSAGE_CONTENT, + TOOL_CALL_ARGS). In compact format, all deltas for the same + message/tool call are merged into a single event. + toolCallId: + type: string + description: Tool call ID (present on TOOL_CALL_* events) + toolCallName: + type: string + description: Tool name (present on TOOL_CALL_START) + stepName: + type: string + description: Step name (present on STEP_STARTED, STEP_FINISHED) + rawEvent: + type: object + description: Custom event payload (present on CUSTOM events, e.g. thinking blocks) + additionalProperties: true + + SessionRunsResponse: + type: object + properties: + session_id: + type: string + runs: + type: array + items: + $ref: '#/components/schemas/RunSummary' + + RunSummary: + type: object + properties: + run_id: + type: string + format: uuid + started_at: + type: integer + format: int64 + description: Unix timestamp in milliseconds + finished_at: + type: integer + format: int64 + description: Unix timestamp in milliseconds + status: + type: string + enum: [running, completed, error] + user_message: + type: string + description: The user message that initiated this run + event_count: + type: integer + description: Total number of AG-UI events in this run + + SessionRepo: + type: object + properties: + url: + type: string + branch: + type: string + + MessageResponse: + type: object + properties: + message: + type: string + example: Interrupt signal sent + + ErrorResponse: + type: object + properties: + error: + type: string + message: + type: string + + responses: + BadRequest: + description: Invalid input + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + Unauthorized: + description: Missing or invalid authentication + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + NotFound: + description: Resource not found + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + BadGateway: + description: Backend unavailable + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' diff --git a/docs/internal/proposals/public-api-session-control.md b/docs/internal/proposals/public-api-session-control.md new file mode 100644 index 000000000..cc2da5e20 --- /dev/null +++ b/docs/internal/proposals/public-api-session-control.md @@ -0,0 +1,79 @@ +# Proposal: Public API Session Control Endpoints + +**Branch:** `feat/public-api-session-control` +**Date:** 2026-03-06 +**Status:** Draft +**Spec:** `components/public-api/openapi.yaml` + +--- + +## Summary + +Extend the public API from 4 session CRUD endpoints to a full session control surface. These endpoints let SDK/MCP clients manage the complete session lifecycle — send messages, retrieve output, start/stop sessions, and interrupt runs — without needing to understand the AG-UI protocol or construct complex request envelopes. + +All endpoints proxy to the existing Go backend. No new K8s operations are introduced in the public API. + +--- + +## Current State (Phase 1) + +The public API currently exposes only basic CRUD: + +| Method | Endpoint | Status | +|--------|----------|--------| +| GET | `/v1/sessions` | Implemented | +| POST | `/v1/sessions` | Implemented | +| GET | `/v1/sessions/{id}` | Implemented | +| DELETE | `/v1/sessions/{id}` | Implemented | + +--- + +## Proposed Endpoints + +### Messaging + +| Method | Endpoint | Description | +|--------|----------|-------------| +| POST | `/v1/sessions/{id}/message` | **Simplified send.** Accepts `{"content": "..."}`, constructs the AG-UI `RunAgentInput` envelope server-side (generates run ID, thread ID, message ID). Proxies to backend `/agui/run`. | +| POST | `/v1/sessions/{id}/runs` | **Raw AG-UI run.** Accepts full `RunAgentInput` with caller-provided run/thread IDs and messages array. Direct proxy to backend `/agui/run`. | + +### Output Retrieval + +| Method | Endpoint | Description | +|--------|----------|-------------| +| GET | `/v1/sessions/{id}/output` | Returns session output in one of three formats via `?format=` query param. | +| GET | `/v1/sessions/{id}/runs` | Lists all runs with status, timestamps, event counts, and originating user message. | + +**Output formats:** + +- **`transcript`** (default) — Assembled conversation messages from `MESSAGES_SNAPSHOT` events. Smallest, most useful for consumers. +- **`compact`** — AG-UI events with streaming deltas merged per message/tool call. Preserves event structure but significantly smaller than raw. +- **`events`** — Raw AG-UI events exactly as persisted. Can be very large. + +All formats support optional `?run_id=` filtering. + +### Lifecycle Control + +| Method | Endpoint | Description | +|--------|----------|-------------| +| POST | `/v1/sessions/{id}/start` | Resume a stopped/completed session. Returns 422 if already running. | +| POST | `/v1/sessions/{id}/stop` | Stop a running session (kills pod). Returns 422 if already stopped. | +| POST | `/v1/sessions/{id}/interrupt` | Cancel the current run without killing the session. Equivalent to the red stop button in the UI. | + +--- + +## What This Enables + +Today, SDK/MCP clients can create sessions via the public API but must either: +- Hit the backend API directly for messaging and lifecycle control +- Construct AG-UI protocol envelopes manually + +With these additions, the public API becomes a complete session management interface. The `/message` endpoint is the key simplification — clients send plain text and the public API handles all AG-UI plumbing. + +--- + +## Open Question + +The backend API is externally routed on all current deployments (lab OKD, ROSA UAT). All proposed endpoints have direct backend equivalents (except `/message` envelope construction, transcript assembly, and run listing). If the backend remains externally accessible, clients can use it directly — making these public API additions a convenience layer rather than a necessity. + +The value depends on whether Phase 3 (backend internalization) from the [original proposal](acp-public-rest-api.md) proceeds. If the backend route is removed, these endpoints become required for external clients. From cd680dea41f1e1f39d219eea4b08497360233f0f Mon Sep 17 00:00:00 2001 From: Tim Williams Date: Mon, 9 Mar 2026 10:21:47 -0400 Subject: [PATCH 2/8] feat(public-api): add session control endpoints for messaging, output, and lifecycle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add 7 new endpoints to the public API gateway: - POST /sessions/:id/runs — create an AG-UI run with full control - GET /sessions/:id/runs — list run summaries derived from export events - POST /sessions/:id/message — simplified message sending (auto-generates run/message IDs) - GET /sessions/:id/output — retrieve session output in transcript, compact, or events format - POST /sessions/:id/start — resume a stopped/completed session (422 if already running) - POST /sessions/:id/stop — stop a running session (422 if already completed/failed) - POST /sessions/:id/interrupt — send interrupt signal to cancel current run Key implementation details: - Uses backend export endpoint to fetch AG-UI events (no SSE parsing needed) - Derives run summaries by grouping events by runId - Transcript format extracts assembled messages from MESSAGES_SNAPSHOT events - Compact format merges consecutive TEXT_MESSAGE_CONTENT/TOOL_CALL_ARGS deltas - Lifecycle endpoints check session phase before acting to prevent invalid transitions - Adds DisplayName and Repos fields to SessionResponse - Promotes github.com/google/uuid from indirect to direct dependency --- components/public-api/go.mod | 3 +- .../public-api/handlers/integration_test.go | 8 + components/public-api/handlers/lifecycle.go | 199 ++++++++++ .../public-api/handlers/lifecycle_test.go | 281 +++++++++++++ components/public-api/handlers/output.go | 227 +++++++++++ components/public-api/handlers/output_test.go | 338 ++++++++++++++++ components/public-api/handlers/runs.go | 372 ++++++++++++++++++ components/public-api/handlers/runs_test.go | 342 ++++++++++++++++ components/public-api/handlers/sessions.go | 23 ++ .../public-api/handlers/sessions_test.go | 49 +++ components/public-api/main.go | 19 +- components/public-api/types/dto.go | 111 +++++- 12 files changed, 1961 insertions(+), 11 deletions(-) create mode 100644 components/public-api/handlers/lifecycle.go create mode 100644 components/public-api/handlers/lifecycle_test.go create mode 100644 components/public-api/handlers/output.go create mode 100644 components/public-api/handlers/output_test.go create mode 100644 components/public-api/handlers/runs.go create mode 100644 components/public-api/handlers/runs_test.go diff --git a/components/public-api/go.mod b/components/public-api/go.mod index cf7fd578d..8fb265dac 100644 --- a/components/public-api/go.mod +++ b/components/public-api/go.mod @@ -5,6 +5,7 @@ go 1.24.0 require ( github.com/gin-contrib/cors v1.7.6 github.com/gin-gonic/gin v1.11.0 + github.com/google/uuid v1.6.0 github.com/prometheus/client_golang v1.23.2 github.com/rs/zerolog v1.34.0 go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.65.0 @@ -31,7 +32,7 @@ require ( github.com/go-playground/validator/v10 v10.30.1 // indirect github.com/goccy/go-json v0.10.5 // indirect github.com/goccy/go-yaml v1.19.2 // indirect - github.com/google/uuid v1.6.0 // indirect + github.com/google/uuid v1.6.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect diff --git a/components/public-api/handlers/integration_test.go b/components/public-api/handlers/integration_test.go index 5f1485dd4..73d30af4a 100644 --- a/components/public-api/handlers/integration_test.go +++ b/components/public-api/handlers/integration_test.go @@ -27,6 +27,14 @@ func setupTestRouter() *gin.Engine { v1.POST("/sessions", CreateSession) v1.GET("/sessions/:id", GetSession) v1.DELETE("/sessions/:id", DeleteSession) + + v1.POST("/sessions/:id/runs", CreateRun) + v1.GET("/sessions/:id/runs", GetSessionRuns) + v1.POST("/sessions/:id/message", SendMessage) + v1.GET("/sessions/:id/output", GetSessionOutput) + v1.POST("/sessions/:id/start", StartSession) + v1.POST("/sessions/:id/stop", StopSession) + v1.POST("/sessions/:id/interrupt", InterruptSession) } return r diff --git a/components/public-api/handlers/lifecycle.go b/components/public-api/handlers/lifecycle.go new file mode 100644 index 000000000..d8bd32b5c --- /dev/null +++ b/components/public-api/handlers/lifecycle.go @@ -0,0 +1,199 @@ +package handlers + +import ( + "encoding/json" + "fmt" + "io" + "log" + "net/http" + + "ambient-code-public-api/types" + + "github.com/gin-gonic/gin" +) + +// StartSession handles POST /v1/sessions/:id/start +func StartSession(c *gin.Context) { + project := GetProject(c) + if !ValidateProjectName(project) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid project name"}) + return + } + sessionID := c.Param("id") + if !ValidateSessionID(sessionID) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid session ID"}) + return + } + + phase, err := getSessionPhase(c, project, sessionID) + if err != nil { + return // getSessionPhase already wrote the error response + } + + if phase == "running" || phase == "pending" { + c.JSON(http.StatusUnprocessableEntity, gin.H{"error": "Session is already running or pending"}) + return + } + + path := fmt.Sprintf("/api/projects/%s/agentic-sessions/%s/start", project, sessionID) + resp, err := ProxyRequest(c, http.MethodPost, path, nil) + if err != nil { + log.Printf("Backend request failed for start session %s: %v", sessionID, err) + c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) + return + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Printf("Failed to read backend response: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return + } + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { + forwardErrorResponse(c, resp.StatusCode, body) + return + } + + var backendResp map[string]interface{} + if err := json.Unmarshal(body, &backendResp); err != nil { + log.Printf("Failed to parse backend response: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return + } + + c.JSON(http.StatusAccepted, transformSession(backendResp)) +} + +// StopSession handles POST /v1/sessions/:id/stop +func StopSession(c *gin.Context) { + project := GetProject(c) + if !ValidateProjectName(project) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid project name"}) + return + } + sessionID := c.Param("id") + if !ValidateSessionID(sessionID) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid session ID"}) + return + } + + phase, err := getSessionPhase(c, project, sessionID) + if err != nil { + return + } + + if phase == "completed" || phase == "failed" { + c.JSON(http.StatusUnprocessableEntity, gin.H{"error": "Session is not in a running state"}) + return + } + + path := fmt.Sprintf("/api/projects/%s/agentic-sessions/%s/stop", project, sessionID) + resp, err := ProxyRequest(c, http.MethodPost, path, nil) + if err != nil { + log.Printf("Backend request failed for stop session %s: %v", sessionID, err) + c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) + return + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Printf("Failed to read backend response: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return + } + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { + forwardErrorResponse(c, resp.StatusCode, body) + return + } + + var backendResp map[string]interface{} + if err := json.Unmarshal(body, &backendResp); err != nil { + log.Printf("Failed to parse backend response: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return + } + + c.JSON(http.StatusAccepted, transformSession(backendResp)) +} + +// InterruptSession handles POST /v1/sessions/:id/interrupt +func InterruptSession(c *gin.Context) { + project := GetProject(c) + if !ValidateProjectName(project) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid project name"}) + return + } + sessionID := c.Param("id") + if !ValidateSessionID(sessionID) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid session ID"}) + return + } + + path := fmt.Sprintf("/api/projects/%s/agentic-sessions/%s/agui/interrupt", project, sessionID) + resp, err := ProxyRequest(c, http.MethodPost, path, []byte("{}")) + if err != nil { + log.Printf("Backend request failed for interrupt session %s: %v", sessionID, err) + c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) + return + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Printf("Failed to read backend response: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return + } + + if resp.StatusCode != http.StatusOK { + forwardErrorResponse(c, resp.StatusCode, body) + return + } + + c.JSON(http.StatusOK, types.MessageResponse{Message: "Interrupt signal sent"}) +} + +// getSessionPhase fetches the session from the backend and returns its normalized phase. +// On error, it writes the appropriate error response to the gin context. +func getSessionPhase(c *gin.Context, project, sessionID string) (string, error) { + path := fmt.Sprintf("/api/projects/%s/agentic-sessions/%s", project, sessionID) + resp, err := ProxyRequest(c, http.MethodGet, path, nil) + if err != nil { + log.Printf("Backend request failed for get session phase %s: %v", sessionID, err) + c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) + return "", fmt.Errorf("backend unavailable") + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Printf("Failed to read backend response: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return "", fmt.Errorf("internal server error") + } + + if resp.StatusCode != http.StatusOK { + forwardErrorResponse(c, resp.StatusCode, body) + return "", fmt.Errorf("backend returned %d", resp.StatusCode) + } + + var backendResp map[string]interface{} + if err := json.Unmarshal(body, &backendResp); err != nil { + log.Printf("Failed to parse backend response: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return "", fmt.Errorf("internal server error") + } + + phase := "" + if status, ok := backendResp["status"].(map[string]interface{}); ok { + if p, ok := status["phase"].(string); ok { + phase = normalizePhase(p) + } + } + + return phase, nil +} diff --git a/components/public-api/handlers/lifecycle_test.go b/components/public-api/handlers/lifecycle_test.go new file mode 100644 index 000000000..9908cf989 --- /dev/null +++ b/components/public-api/handlers/lifecycle_test.go @@ -0,0 +1,281 @@ +package handlers + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "ambient-code-public-api/types" +) + +func makeSessionBackend(t *testing.T, phase string, lifecyclePath string, lifecycleStatus int) *httptest.Server { + t.Helper() + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + // GET session (for phase check) + if r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/agentic-sessions/") { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "test-session", + "creationTimestamp": "2026-01-29T10:00:00Z", + }, + "spec": map[string]interface{}{ + "prompt": "Test task", + }, + "status": map[string]interface{}{ + "phase": phase, + }, + }) + return + } + + // POST lifecycle action + if r.Method == http.MethodPost && strings.Contains(r.URL.Path, lifecyclePath) { + w.WriteHeader(lifecycleStatus) + json.NewEncoder(w).Encode(map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "test-session", + "creationTimestamp": "2026-01-29T10:00:00Z", + }, + "spec": map[string]interface{}{ + "prompt": "Test task", + }, + "status": map[string]interface{}{ + "phase": "Running", + }, + }) + return + } + + w.WriteHeader(http.StatusNotFound) + json.NewEncoder(w).Encode(map[string]string{"error": "Not found"}) + })) +} + +func TestE2E_StartSession(t *testing.T) { + backend := makeSessionBackend(t, "Completed", "/start", http.StatusOK) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/start", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusAccepted { + t.Errorf("Expected status 202, got %d: %s", w.Code, w.Body.String()) + } + + var response types.SessionResponse + json.Unmarshal(w.Body.Bytes(), &response) + if response.ID != "test-session" { + t.Errorf("Expected ID 'test-session', got %q", response.ID) + } +} + +func TestE2E_StartSession_AlreadyRunning(t *testing.T) { + backend := makeSessionBackend(t, "Running", "/start", http.StatusOK) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/start", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusUnprocessableEntity { + t.Errorf("Expected status 422, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestE2E_StartSession_AlreadyPending(t *testing.T) { + backend := makeSessionBackend(t, "Pending", "/start", http.StatusOK) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/start", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusUnprocessableEntity { + t.Errorf("Expected status 422 for pending session, got %d", w.Code) + } +} + +func TestE2E_StopSession(t *testing.T) { + backend := makeSessionBackend(t, "Running", "/stop", http.StatusOK) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/stop", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusAccepted { + t.Errorf("Expected status 202, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestE2E_StopSession_AlreadyCompleted(t *testing.T) { + backend := makeSessionBackend(t, "Completed", "/stop", http.StatusOK) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/stop", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusUnprocessableEntity { + t.Errorf("Expected status 422, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestE2E_StopSession_AlreadyFailed(t *testing.T) { + backend := makeSessionBackend(t, "Failed", "/stop", http.StatusOK) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/stop", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusUnprocessableEntity { + t.Errorf("Expected status 422 for failed session, got %d", w.Code) + } +} + +func TestE2E_InterruptSession(t *testing.T) { + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Errorf("Expected POST, got %s", r.Method) + } + if !strings.Contains(r.URL.Path, "/agui/interrupt") { + t.Errorf("Expected path to contain /agui/interrupt, got %s", r.URL.Path) + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]string{"message": "ok"}) + })) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/interrupt", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Errorf("Expected status 200, got %d: %s", w.Code, w.Body.String()) + } + + var response types.MessageResponse + json.Unmarshal(w.Body.Bytes(), &response) + if response.Message != "Interrupt signal sent" { + t.Errorf("Expected message 'Interrupt signal sent', got %q", response.Message) + } +} + +func TestE2E_InterruptSession_BackendError(t *testing.T) { + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusBadGateway) + json.NewEncoder(w).Encode(map[string]string{"error": "Runner unavailable"}) + })) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/interrupt", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusBadGateway { + t.Errorf("Expected status 502, got %d", w.Code) + } +} + +func TestE2E_StartSession_InvalidSessionID(t *testing.T) { + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/INVALID/start", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("Expected status 400, got %d", w.Code) + } +} + +func TestE2E_StopSession_BackendReturns404(t *testing.T) { + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusNotFound) + json.NewEncoder(w).Encode(map[string]string{"error": "Session not found"}) + })) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/nonexistent/stop", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusNotFound { + t.Errorf("Expected status 404, got %d", w.Code) + } +} diff --git a/components/public-api/handlers/output.go b/components/public-api/handlers/output.go new file mode 100644 index 000000000..27ab301e3 --- /dev/null +++ b/components/public-api/handlers/output.go @@ -0,0 +1,227 @@ +package handlers + +import ( + "fmt" + "net/http" + "regexp" + + "ambient-code-public-api/types" + + "github.com/gin-gonic/gin" +) + +var uuidRegex = regexp.MustCompile(`^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$`) + +// GetSessionOutput handles GET /v1/sessions/:id/output +func GetSessionOutput(c *gin.Context) { + project := GetProject(c) + if !ValidateProjectName(project) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid project name"}) + return + } + sessionID := c.Param("id") + if !ValidateSessionID(sessionID) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid session ID"}) + return + } + + format := c.DefaultQuery("format", "transcript") + if format != "transcript" && format != "compact" && format != "events" { + c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("Invalid format %q, must be one of: transcript, compact, events", format)}) + return + } + + runIDFilter := c.Query("run_id") + if runIDFilter != "" && !uuidRegex.MatchString(runIDFilter) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid run_id format, must be a valid UUID"}) + return + } + + events, statusCode, err := fetchSessionEvents(c, project, sessionID) + if err != nil { + if statusCode > 0 { + c.JSON(statusCode, gin.H{"error": err.Error()}) + } else { + c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) + } + return + } + + // Filter by run_id if provided + if runIDFilter != "" { + filtered := make([]map[string]interface{}, 0) + for _, event := range events { + if runID, _ := event["runId"].(string); runID == runIDFilter { + filtered = append(filtered, event) + } + } + events = filtered + } + + switch format { + case "events": + c.JSON(http.StatusOK, types.EventsOutputResponse{ + SessionID: sessionID, + Format: "events", + Events: events, + }) + case "compact": + compacted := compactEvents(events) + c.JSON(http.StatusOK, types.EventsOutputResponse{ + SessionID: sessionID, + Format: "compact", + Events: compacted, + }) + case "transcript": + messages := extractTranscript(events) + c.JSON(http.StatusOK, types.TranscriptOutputResponse{ + SessionID: sessionID, + Format: "transcript", + Messages: messages, + }) + } +} + +// compactEvents merges consecutive TEXT_MESSAGE_CONTENT and TOOL_CALL_ARGS deltas. +func compactEvents(events []map[string]interface{}) []map[string]interface{} { + result := make([]map[string]interface{}, 0, len(events)) + + for i := 0; i < len(events); i++ { + event := events[i] + eventType, _ := event["type"].(string) + + if eventType == "TEXT_MESSAGE_CONTENT" { + messageID, _ := event["messageId"].(string) + merged := copyEvent(event) + delta, _ := merged["delta"].(string) + + for i+1 < len(events) { + next := events[i+1] + nextType, _ := next["type"].(string) + nextMsgID, _ := next["messageId"].(string) + if nextType == "TEXT_MESSAGE_CONTENT" && nextMsgID == messageID { + nextDelta, _ := next["delta"].(string) + delta += nextDelta + i++ + } else { + break + } + } + merged["delta"] = delta + result = append(result, merged) + } else if eventType == "TOOL_CALL_ARGS" { + toolCallID, _ := event["toolCallId"].(string) + merged := copyEvent(event) + delta, _ := merged["delta"].(string) + + for i+1 < len(events) { + next := events[i+1] + nextType, _ := next["type"].(string) + nextTCID, _ := next["toolCallId"].(string) + if nextType == "TOOL_CALL_ARGS" && nextTCID == toolCallID { + nextDelta, _ := next["delta"].(string) + delta += nextDelta + i++ + } else { + break + } + } + merged["delta"] = delta + result = append(result, merged) + } else { + result = append(result, event) + } + } + + return result +} + +// extractTranscript finds the last MESSAGES_SNAPSHOT event and extracts messages. +func extractTranscript(events []map[string]interface{}) []types.TranscriptMessage { + // Find last MESSAGES_SNAPSHOT event + var snapshotMessages []interface{} + for i := len(events) - 1; i >= 0; i-- { + eventType, _ := events[i]["type"].(string) + if eventType == "MESSAGES_SNAPSHOT" { + if msgs, ok := events[i]["messages"].([]interface{}); ok { + snapshotMessages = msgs + } + break + } + } + + if snapshotMessages == nil { + return []types.TranscriptMessage{} + } + + messages := make([]types.TranscriptMessage, 0, len(snapshotMessages)) + for _, raw := range snapshotMessages { + msg, ok := raw.(map[string]interface{}) + if !ok { + continue + } + + tm := types.TranscriptMessage{} + if id, ok := msg["id"].(string); ok { + tm.ID = id + } + if role, ok := msg["role"].(string); ok { + tm.Role = role + } + if content, ok := msg["content"].(string); ok { + tm.Content = content + } + if toolCallID, ok := msg["toolCallId"].(string); ok { + tm.ToolCallID = toolCallID + } + if name, ok := msg["name"].(string); ok { + tm.Name = name + } + if timestamp, ok := msg["timestamp"].(string); ok { + tm.Timestamp = timestamp + } + + // Extract tool calls if present + if toolCalls, ok := msg["toolCalls"].([]interface{}); ok { + for _, tcRaw := range toolCalls { + tc, ok := tcRaw.(map[string]interface{}) + if !ok { + continue + } + ttc := types.TranscriptToolCall{} + if id, ok := tc["id"].(string); ok { + ttc.ID = id + } + if name, ok := tc["name"].(string); ok { + ttc.Name = name + } + if args, ok := tc["args"].(string); ok { + ttc.Args = args + } + if result, ok := tc["result"].(string); ok { + ttc.Result = result + } + if status, ok := tc["status"].(string); ok { + ttc.Status = status + } + if duration, ok := tc["duration"]; ok { + ttc.Duration = toInt64(duration) + } + tm.ToolCalls = append(tm.ToolCalls, ttc) + } + } + + messages = append(messages, tm) + } + + return messages +} + +// copyEvent creates a shallow copy of an event map. +func copyEvent(event map[string]interface{}) map[string]interface{} { + copied := make(map[string]interface{}, len(event)) + for k, v := range event { + copied[k] = v + } + return copied +} diff --git a/components/public-api/handlers/output_test.go b/components/public-api/handlers/output_test.go new file mode 100644 index 000000000..8743c96b2 --- /dev/null +++ b/components/public-api/handlers/output_test.go @@ -0,0 +1,338 @@ +package handlers + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "ambient-code-public-api/types" +) + +func TestCompactEvents(t *testing.T) { + events := []map[string]interface{}{ + {"type": "RUN_STARTED", "runId": "r1"}, + {"type": "TEXT_MESSAGE_START", "runId": "r1", "messageId": "m1", "role": "assistant"}, + {"type": "TEXT_MESSAGE_CONTENT", "runId": "r1", "messageId": "m1", "delta": "Hello "}, + {"type": "TEXT_MESSAGE_CONTENT", "runId": "r1", "messageId": "m1", "delta": "world"}, + {"type": "TEXT_MESSAGE_CONTENT", "runId": "r1", "messageId": "m1", "delta": "!"}, + {"type": "TEXT_MESSAGE_END", "runId": "r1", "messageId": "m1"}, + {"type": "TOOL_CALL_START", "runId": "r1", "toolCallId": "tc1", "toolCallName": "read"}, + {"type": "TOOL_CALL_ARGS", "runId": "r1", "toolCallId": "tc1", "delta": `{"file"`}, + {"type": "TOOL_CALL_ARGS", "runId": "r1", "toolCallId": "tc1", "delta": `: "main.go"}`}, + {"type": "TOOL_CALL_END", "runId": "r1", "toolCallId": "tc1"}, + {"type": "RUN_FINISHED", "runId": "r1"}, + } + + compacted := compactEvents(events) + + // Should merge TEXT_MESSAGE_CONTENT: 3 deltas → 1 + // Should merge TOOL_CALL_ARGS: 2 deltas → 1 + // Other events pass through: RUN_STARTED, TEXT_MESSAGE_START, TEXT_MESSAGE_END, TOOL_CALL_START, TOOL_CALL_END, RUN_FINISHED = 6 + // Total: 6 + 1 + 1 = 8 + if len(compacted) != 8 { + t.Fatalf("Expected 8 compacted events, got %d", len(compacted)) + } + + // Find the merged text content event + for _, e := range compacted { + if e["type"] == "TEXT_MESSAGE_CONTENT" { + if e["delta"] != "Hello world!" { + t.Errorf("Expected merged delta 'Hello world!', got %q", e["delta"]) + } + } + if e["type"] == "TOOL_CALL_ARGS" { + if e["delta"] != `{"file": "main.go"}` { + t.Errorf("Expected merged delta '{\"file\": \"main.go\"}', got %q", e["delta"]) + } + } + } +} + +func TestCompactEvents_DifferentMessageIDs(t *testing.T) { + events := []map[string]interface{}{ + {"type": "TEXT_MESSAGE_CONTENT", "messageId": "m1", "delta": "A"}, + {"type": "TEXT_MESSAGE_CONTENT", "messageId": "m2", "delta": "B"}, + {"type": "TEXT_MESSAGE_CONTENT", "messageId": "m1", "delta": "C"}, + } + + compacted := compactEvents(events) + + // Different messageIds should NOT merge + if len(compacted) != 3 { + t.Fatalf("Expected 3 events (different messageIds), got %d", len(compacted)) + } +} + +func TestCompactEvents_Empty(t *testing.T) { + compacted := compactEvents([]map[string]interface{}{}) + if len(compacted) != 0 { + t.Errorf("Expected 0 events, got %d", len(compacted)) + } +} + +func TestExtractTranscript(t *testing.T) { + events := []map[string]interface{}{ + {"type": "RUN_STARTED", "runId": "r1"}, + {"type": "TEXT_MESSAGE_START", "runId": "r1"}, + {"type": "MESSAGES_SNAPSHOT", "runId": "r1", "messages": []interface{}{ + map[string]interface{}{ + "id": "msg-1", + "role": "user", + "content": "Hello", + }, + map[string]interface{}{ + "id": "msg-2", + "role": "assistant", + "content": "Hi there!", + "toolCalls": []interface{}{ + map[string]interface{}{ + "id": "tc-1", + "name": "read", + "args": `{"file": "main.go"}`, + "status": "completed", + "duration": float64(100), + }, + }, + }, + map[string]interface{}{ + "id": "msg-3", + "role": "tool", + "content": "file content here", + "toolCallId": "tc-1", + "name": "read", + }, + }}, + {"type": "RUN_FINISHED", "runId": "r1"}, + } + + messages := extractTranscript(events) + + if len(messages) != 3 { + t.Fatalf("Expected 3 messages, got %d", len(messages)) + } + + if messages[0].Role != "user" || messages[0].Content != "Hello" { + t.Errorf("First message: role=%q content=%q", messages[0].Role, messages[0].Content) + } + + if messages[1].Role != "assistant" || messages[1].Content != "Hi there!" { + t.Errorf("Second message: role=%q content=%q", messages[1].Role, messages[1].Content) + } + if len(messages[1].ToolCalls) != 1 { + t.Fatalf("Expected 1 tool call, got %d", len(messages[1].ToolCalls)) + } + if messages[1].ToolCalls[0].Name != "read" { + t.Errorf("Expected tool call name 'read', got %q", messages[1].ToolCalls[0].Name) + } + if messages[1].ToolCalls[0].Duration != 100 { + t.Errorf("Expected tool call duration 100, got %d", messages[1].ToolCalls[0].Duration) + } + + if messages[2].Role != "tool" || messages[2].ToolCallID != "tc-1" { + t.Errorf("Third message: role=%q toolCallId=%q", messages[2].Role, messages[2].ToolCallID) + } +} + +func TestExtractTranscript_NoSnapshot(t *testing.T) { + events := []map[string]interface{}{ + {"type": "RUN_STARTED", "runId": "r1"}, + {"type": "TEXT_MESSAGE_START", "runId": "r1"}, + {"type": "RUN_FINISHED", "runId": "r1"}, + } + + messages := extractTranscript(events) + if len(messages) != 0 { + t.Errorf("Expected 0 messages when no snapshot, got %d", len(messages)) + } +} + +func TestExtractTranscript_UsesLastSnapshot(t *testing.T) { + events := []map[string]interface{}{ + {"type": "MESSAGES_SNAPSHOT", "messages": []interface{}{ + map[string]interface{}{"id": "old", "role": "user", "content": "Old"}, + }}, + {"type": "MESSAGES_SNAPSHOT", "messages": []interface{}{ + map[string]interface{}{"id": "new", "role": "user", "content": "New"}, + }}, + } + + messages := extractTranscript(events) + if len(messages) != 1 { + t.Fatalf("Expected 1 message, got %d", len(messages)) + } + if messages[0].Content != "New" { + t.Errorf("Expected last snapshot content 'New', got %q", messages[0].Content) + } +} + +func makeExportBackend(t *testing.T, events []map[string]interface{}) *httptest.Server { + t.Helper() + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + eventsJSON, _ := json.Marshal(events) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]interface{}{ + "sessionId": "test-session", + "aguiEvents": json.RawMessage(eventsJSON), + }) + })) +} + +func TestE2E_GetOutput_Transcript(t *testing.T) { + events := []map[string]interface{}{ + {"type": "MESSAGES_SNAPSHOT", "messages": []interface{}{ + map[string]interface{}{"id": "m1", "role": "user", "content": "Hello"}, + }}, + } + backend := makeExportBackend(t, events) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/sessions/test-session/output", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("Expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var response types.TranscriptOutputResponse + json.Unmarshal(w.Body.Bytes(), &response) + if response.Format != "transcript" { + t.Errorf("Expected format 'transcript', got %q", response.Format) + } + if len(response.Messages) != 1 { + t.Fatalf("Expected 1 message, got %d", len(response.Messages)) + } +} + +func TestE2E_GetOutput_Events(t *testing.T) { + events := []map[string]interface{}{ + {"type": "RUN_STARTED", "runId": "r1"}, + {"type": "RUN_FINISHED", "runId": "r1"}, + } + backend := makeExportBackend(t, events) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/sessions/test-session/output?format=events", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("Expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var response types.EventsOutputResponse + json.Unmarshal(w.Body.Bytes(), &response) + if response.Format != "events" { + t.Errorf("Expected format 'events', got %q", response.Format) + } + if len(response.Events) != 2 { + t.Errorf("Expected 2 events, got %d", len(response.Events)) + } +} + +func TestE2E_GetOutput_Compact(t *testing.T) { + events := []map[string]interface{}{ + {"type": "TEXT_MESSAGE_CONTENT", "messageId": "m1", "delta": "A"}, + {"type": "TEXT_MESSAGE_CONTENT", "messageId": "m1", "delta": "B"}, + } + backend := makeExportBackend(t, events) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/sessions/test-session/output?format=compact", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("Expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var response types.EventsOutputResponse + json.Unmarshal(w.Body.Bytes(), &response) + if response.Format != "compact" { + t.Errorf("Expected format 'compact', got %q", response.Format) + } + if len(response.Events) != 1 { + t.Errorf("Expected 1 compacted event, got %d", len(response.Events)) + } +} + +func TestE2E_GetOutput_InvalidFormat(t *testing.T) { + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/sessions/test-session/output?format=xml", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("Expected 400 for invalid format, got %d", w.Code) + } +} + +func TestE2E_GetOutput_InvalidRunID(t *testing.T) { + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/sessions/test-session/output?run_id=not-a-uuid", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("Expected 400 for invalid run_id, got %d", w.Code) + } +} + +func TestE2E_GetOutput_RunIDFilter(t *testing.T) { + events := []map[string]interface{}{ + {"type": "RUN_STARTED", "runId": "11111111-1111-1111-1111-111111111111"}, + {"type": "RUN_FINISHED", "runId": "11111111-1111-1111-1111-111111111111"}, + {"type": "RUN_STARTED", "runId": "22222222-2222-2222-2222-222222222222"}, + {"type": "RUN_FINISHED", "runId": "22222222-2222-2222-2222-222222222222"}, + } + backend := makeExportBackend(t, events) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/sessions/test-session/output?format=events&run_id=11111111-1111-1111-1111-111111111111", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("Expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var response types.EventsOutputResponse + json.Unmarshal(w.Body.Bytes(), &response) + if len(response.Events) != 2 { + t.Errorf("Expected 2 events after filtering, got %d", len(response.Events)) + } +} diff --git a/components/public-api/handlers/runs.go b/components/public-api/handlers/runs.go new file mode 100644 index 000000000..a917a5a71 --- /dev/null +++ b/components/public-api/handlers/runs.go @@ -0,0 +1,372 @@ +package handlers + +import ( + "encoding/json" + "fmt" + "io" + "log" + "net/http" + + "ambient-code-public-api/types" + + "github.com/gin-gonic/gin" + "github.com/google/uuid" +) + +// CreateRun handles POST /v1/sessions/:id/runs +func CreateRun(c *gin.Context) { + project := GetProject(c) + if !ValidateProjectName(project) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid project name"}) + return + } + sessionID := c.Param("id") + if !ValidateSessionID(sessionID) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid session ID"}) + return + } + + var req types.CreateRunRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + // Build AG-UI RunAgentInput messages + messages := make([]map[string]interface{}, len(req.Messages)) + for i, msg := range req.Messages { + m := map[string]interface{}{ + "role": msg.Role, + "content": msg.Content, + } + if msg.ID != "" { + m["id"] = msg.ID + } else { + m["id"] = uuid.New().String() + } + messages[i] = m + } + + messagesJSON, err := json.Marshal(messages) + if err != nil { + log.Printf("Failed to marshal messages: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return + } + + // Build RunAgentInput + backendReq := map[string]interface{}{ + "messages": json.RawMessage(messagesJSON), + } + + if req.ThreadID != "" { + backendReq["threadId"] = req.ThreadID + } else { + backendReq["threadId"] = sessionID + } + + if req.RunID != "" { + backendReq["runId"] = req.RunID + } + + reqBody, err := json.Marshal(backendReq) + if err != nil { + log.Printf("Failed to marshal request: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return + } + + path := fmt.Sprintf("/api/projects/%s/agentic-sessions/%s/agui/run", project, sessionID) + resp, err := ProxyRequest(c, http.MethodPost, path, reqBody) + if err != nil { + log.Printf("Backend request failed for create run: %v", err) + c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) + return + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Printf("Failed to read backend response: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return + } + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { + forwardErrorResponse(c, resp.StatusCode, body) + return + } + + var backendResp map[string]interface{} + if err := json.Unmarshal(body, &backendResp); err != nil { + log.Printf("Failed to parse backend response: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return + } + + runID, _ := backendResp["runId"].(string) + threadID, _ := backendResp["threadId"].(string) + + c.JSON(http.StatusAccepted, types.CreateRunResponse{ + RunID: runID, + ThreadID: threadID, + }) +} + +// SendMessage handles POST /v1/sessions/:id/message +func SendMessage(c *gin.Context) { + project := GetProject(c) + if !ValidateProjectName(project) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid project name"}) + return + } + sessionID := c.Param("id") + if !ValidateSessionID(sessionID) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid session ID"}) + return + } + + var req types.SendMessageRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + runID := uuid.New().String() + messageID := uuid.New().String() + + messages := []map[string]interface{}{ + { + "id": messageID, + "role": "user", + "content": req.Content, + }, + } + + messagesJSON, err := json.Marshal(messages) + if err != nil { + log.Printf("Failed to marshal messages: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return + } + + backendReq := map[string]interface{}{ + "threadId": sessionID, + "runId": runID, + "messages": json.RawMessage(messagesJSON), + } + + reqBody, err := json.Marshal(backendReq) + if err != nil { + log.Printf("Failed to marshal request: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return + } + + path := fmt.Sprintf("/api/projects/%s/agentic-sessions/%s/agui/run", project, sessionID) + resp, err := ProxyRequest(c, http.MethodPost, path, reqBody) + if err != nil { + log.Printf("Backend request failed for send message: %v", err) + c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) + return + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Printf("Failed to read backend response: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return + } + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { + forwardErrorResponse(c, resp.StatusCode, body) + return + } + + var backendResp map[string]interface{} + if err := json.Unmarshal(body, &backendResp); err != nil { + log.Printf("Failed to parse backend response: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + return + } + + respRunID, _ := backendResp["runId"].(string) + respThreadID, _ := backendResp["threadId"].(string) + + c.JSON(http.StatusAccepted, types.SendMessageResponse{ + RunID: respRunID, + ThreadID: respThreadID, + }) +} + +// GetSessionRuns handles GET /v1/sessions/:id/runs +func GetSessionRuns(c *gin.Context) { + project := GetProject(c) + if !ValidateProjectName(project) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid project name"}) + return + } + sessionID := c.Param("id") + if !ValidateSessionID(sessionID) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid session ID"}) + return + } + + events, statusCode, err := fetchSessionEvents(c, project, sessionID) + if err != nil { + if statusCode > 0 { + c.JSON(statusCode, gin.H{"error": err.Error()}) + } else { + c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) + } + return + } + + runs := deriveRunSummaries(events) + + c.JSON(http.StatusOK, types.SessionRunsResponse{ + SessionID: sessionID, + Runs: runs, + }) +} + +// fetchSessionEvents retrieves AG-UI events from the backend export endpoint. +// Returns the events array, an HTTP status code for errors, and any error. +func fetchSessionEvents(c *gin.Context, project, sessionID string) ([]map[string]interface{}, int, error) { + path := fmt.Sprintf("/api/projects/%s/agentic-sessions/%s/export", project, sessionID) + + resp, err := ProxyRequest(c, http.MethodGet, path, nil) + if err != nil { + log.Printf("Backend request failed for export: %v", err) + return nil, 0, fmt.Errorf("Backend unavailable") + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Printf("Failed to read export response: %v", err) + return nil, http.StatusInternalServerError, fmt.Errorf("Internal server error") + } + + if resp.StatusCode != http.StatusOK { + // Try to extract error message from backend response + var errorResp map[string]interface{} + if jsonErr := json.Unmarshal(body, &errorResp); jsonErr == nil { + if errMsg, ok := errorResp["error"].(string); ok { + return nil, resp.StatusCode, fmt.Errorf("%s", errMsg) + } + } + return nil, resp.StatusCode, fmt.Errorf("Request failed") + } + + var exportResp struct { + AGUIEvents json.RawMessage `json:"aguiEvents"` + } + if err := json.Unmarshal(body, &exportResp); err != nil { + log.Printf("Failed to parse export response: %v", err) + return nil, http.StatusInternalServerError, fmt.Errorf("Internal server error") + } + + var events []map[string]interface{} + if err := json.Unmarshal(exportResp.AGUIEvents, &events); err != nil { + log.Printf("Failed to parse aguiEvents: %v", err) + return nil, http.StatusInternalServerError, fmt.Errorf("Internal server error") + } + + return events, 0, nil +} + +// deriveRunSummaries groups events by runId and builds run summaries. +func deriveRunSummaries(events []map[string]interface{}) []types.RunSummary { + type runData struct { + summary types.RunSummary + order int + } + + runMap := make(map[string]*runData) + orderCounter := 0 + + for _, event := range events { + runID, _ := event["runId"].(string) + if runID == "" { + continue + } + + rd, exists := runMap[runID] + if !exists { + rd = &runData{ + summary: types.RunSummary{ + RunID: runID, + Status: "running", + }, + order: orderCounter, + } + orderCounter++ + runMap[runID] = rd + } + + rd.summary.EventCount++ + + eventType, _ := event["type"].(string) + timestamp := toInt64(event["timestamp"]) + + switch eventType { + case "RUN_STARTED": + if timestamp > 0 { + rd.summary.StartedAt = timestamp + } + case "RUN_FINISHED": + rd.summary.Status = "completed" + if timestamp > 0 { + rd.summary.FinishedAt = timestamp + } + case "RUN_ERROR": + rd.summary.Status = "error" + if timestamp > 0 { + rd.summary.FinishedAt = timestamp + } + case "TEXT_MESSAGE_START": + role, _ := event["role"].(string) + content, _ := event["content"].(string) + if role == "user" && content != "" && rd.summary.UserMessage == "" { + rd.summary.UserMessage = content + } + } + } + + // Build sorted slice + runs := make([]types.RunSummary, 0, len(runMap)) + sorted := make([]*runData, 0, len(runMap)) + for _, rd := range runMap { + sorted = append(sorted, rd) + } + // Sort by insertion order + for i := 0; i < len(sorted); i++ { + for j := i + 1; j < len(sorted); j++ { + if sorted[j].order < sorted[i].order { + sorted[i], sorted[j] = sorted[j], sorted[i] + } + } + } + for _, rd := range sorted { + runs = append(runs, rd.summary) + } + + return runs +} + +// toInt64 converts a JSON number (float64) to int64. +func toInt64(v interface{}) int64 { + switch n := v.(type) { + case float64: + return int64(n) + case int64: + return n + case json.Number: + i, _ := n.Int64() + return i + default: + return 0 + } +} diff --git a/components/public-api/handlers/runs_test.go b/components/public-api/handlers/runs_test.go new file mode 100644 index 000000000..c1a5ad8d1 --- /dev/null +++ b/components/public-api/handlers/runs_test.go @@ -0,0 +1,342 @@ +package handlers + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "ambient-code-public-api/types" +) + +func TestDeriveRunSummaries(t *testing.T) { + events := []map[string]interface{}{ + {"type": "RUN_STARTED", "runId": "run-1", "timestamp": float64(1000)}, + {"type": "TEXT_MESSAGE_START", "runId": "run-1", "role": "user", "content": "Hello"}, + {"type": "TEXT_MESSAGE_CONTENT", "runId": "run-1", "delta": "Hi"}, + {"type": "RUN_FINISHED", "runId": "run-1", "timestamp": float64(2000)}, + {"type": "RUN_STARTED", "runId": "run-2", "timestamp": float64(3000)}, + {"type": "TEXT_MESSAGE_START", "runId": "run-2", "role": "user", "content": "Fix bug"}, + {"type": "RUN_ERROR", "runId": "run-2", "timestamp": float64(4000)}, + } + + runs := deriveRunSummaries(events) + + if len(runs) != 2 { + t.Fatalf("Expected 2 runs, got %d", len(runs)) + } + + // First run + if runs[0].RunID != "run-1" { + t.Errorf("Expected run-1, got %s", runs[0].RunID) + } + if runs[0].Status != "completed" { + t.Errorf("Expected completed, got %s", runs[0].Status) + } + if runs[0].StartedAt != 1000 { + t.Errorf("Expected started_at 1000, got %d", runs[0].StartedAt) + } + if runs[0].FinishedAt != 2000 { + t.Errorf("Expected finished_at 2000, got %d", runs[0].FinishedAt) + } + if runs[0].UserMessage != "Hello" { + t.Errorf("Expected user message 'Hello', got %q", runs[0].UserMessage) + } + if runs[0].EventCount != 4 { + t.Errorf("Expected 4 events, got %d", runs[0].EventCount) + } + + // Second run + if runs[1].RunID != "run-2" { + t.Errorf("Expected run-2, got %s", runs[1].RunID) + } + if runs[1].Status != "error" { + t.Errorf("Expected error, got %s", runs[1].Status) + } + if runs[1].UserMessage != "Fix bug" { + t.Errorf("Expected user message 'Fix bug', got %q", runs[1].UserMessage) + } +} + +func TestDeriveRunSummaries_Empty(t *testing.T) { + runs := deriveRunSummaries([]map[string]interface{}{}) + if len(runs) != 0 { + t.Errorf("Expected 0 runs, got %d", len(runs)) + } +} + +func TestDeriveRunSummaries_NoRunID(t *testing.T) { + events := []map[string]interface{}{ + {"type": "RUN_STARTED"}, + {"type": "TEXT_MESSAGE_START", "role": "user"}, + } + runs := deriveRunSummaries(events) + if len(runs) != 0 { + t.Errorf("Expected 0 runs for events without runId, got %d", len(runs)) + } +} + +func TestDeriveRunSummaries_RunningStatus(t *testing.T) { + events := []map[string]interface{}{ + {"type": "RUN_STARTED", "runId": "run-1", "timestamp": float64(1000)}, + {"type": "TEXT_MESSAGE_START", "runId": "run-1", "role": "user", "content": "Hello"}, + } + runs := deriveRunSummaries(events) + if len(runs) != 1 { + t.Fatalf("Expected 1 run, got %d", len(runs)) + } + if runs[0].Status != "running" { + t.Errorf("Expected running, got %s", runs[0].Status) + } +} + +func TestToInt64(t *testing.T) { + tests := []struct { + name string + input interface{} + expected int64 + }{ + {"float64", float64(1234), 1234}, + {"int64", int64(5678), 5678}, + {"json.Number", json.Number("9999"), 9999}, + {"string", "not-a-number", 0}, + {"nil", nil, 0}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := toInt64(tt.input) + if result != tt.expected { + t.Errorf("toInt64(%v) = %d, want %d", tt.input, result, tt.expected) + } + }) + } +} + +func TestE2E_CreateRun(t *testing.T) { + var receivedBody map[string]interface{} + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Errorf("Expected POST, got %s", r.Method) + } + if !strings.Contains(r.URL.Path, "/agui/run") { + t.Errorf("Expected path to contain /agui/run, got %s", r.URL.Path) + } + + decoder := json.NewDecoder(r.Body) + decoder.Decode(&receivedBody) + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]string{ + "runId": "test-run-id", + "threadId": "test-session", + }) + })) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/runs", + strings.NewReader(`{"messages": [{"role": "user", "content": "Hello"}]}`)) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + req.Header.Set("Content-Type", "application/json") + router.ServeHTTP(w, req) + + if w.Code != http.StatusAccepted { + t.Errorf("Expected status 202, got %d: %s", w.Code, w.Body.String()) + } + + var response types.CreateRunResponse + json.Unmarshal(w.Body.Bytes(), &response) + if response.RunID != "test-run-id" { + t.Errorf("Expected run_id 'test-run-id', got %q", response.RunID) + } + if response.ThreadID != "test-session" { + t.Errorf("Expected thread_id 'test-session', got %q", response.ThreadID) + } + + // Verify messages were forwarded + if receivedBody["messages"] == nil { + t.Error("Expected messages in request body") + } +} + +func TestE2E_CreateRun_InvalidBody(t *testing.T) { + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/runs", + strings.NewReader(`{"messages": []}`)) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + req.Header.Set("Content-Type", "application/json") + router.ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("Expected status 400, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestE2E_CreateRun_NoMessages(t *testing.T) { + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/runs", + strings.NewReader(`{}`)) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + req.Header.Set("Content-Type", "application/json") + router.ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("Expected status 400, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestE2E_GetSessionRuns(t *testing.T) { + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !strings.Contains(r.URL.Path, "/export") { + t.Errorf("Expected path to contain /export, got %s", r.URL.Path) + } + + events := []map[string]interface{}{ + {"type": "RUN_STARTED", "runId": "run-abc", "timestamp": float64(1000)}, + {"type": "TEXT_MESSAGE_START", "runId": "run-abc", "role": "user", "content": "Hello"}, + {"type": "RUN_FINISHED", "runId": "run-abc", "timestamp": float64(2000)}, + } + eventsJSON, _ := json.Marshal(events) + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]interface{}{ + "sessionId": "test-session", + "aguiEvents": json.RawMessage(eventsJSON), + }) + })) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/sessions/test-session/runs", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Errorf("Expected status 200, got %d: %s", w.Code, w.Body.String()) + } + + var response types.SessionRunsResponse + json.Unmarshal(w.Body.Bytes(), &response) + if response.SessionID != "test-session" { + t.Errorf("Expected session_id 'test-session', got %q", response.SessionID) + } + if len(response.Runs) != 1 { + t.Fatalf("Expected 1 run, got %d", len(response.Runs)) + } + if response.Runs[0].Status != "completed" { + t.Errorf("Expected completed status, got %q", response.Runs[0].Status) + } + if response.Runs[0].UserMessage != "Hello" { + t.Errorf("Expected user message 'Hello', got %q", response.Runs[0].UserMessage) + } +} + +func TestE2E_GetSessionRuns_BackendError(t *testing.T) { + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusNotFound) + json.NewEncoder(w).Encode(map[string]string{"error": "Session not found"}) + })) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/sessions/nonexistent/runs", nil) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + router.ServeHTTP(w, req) + + if w.Code != http.StatusNotFound { + t.Errorf("Expected status 404, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestE2E_SendMessage(t *testing.T) { + var receivedBody map[string]interface{} + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + decoder := json.NewDecoder(r.Body) + decoder.Decode(&receivedBody) + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]string{ + "runId": "generated-run-id", + "threadId": "test-session", + }) + })) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/message", + strings.NewReader(`{"content": "Fix the bug please"}`)) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + req.Header.Set("Content-Type", "application/json") + router.ServeHTTP(w, req) + + if w.Code != http.StatusAccepted { + t.Errorf("Expected status 202, got %d: %s", w.Code, w.Body.String()) + } + + var response types.SendMessageResponse + json.Unmarshal(w.Body.Bytes(), &response) + if response.RunID != "generated-run-id" { + t.Errorf("Expected run_id 'generated-run-id', got %q", response.RunID) + } + if response.ThreadID != "test-session" { + t.Errorf("Expected thread_id 'test-session', got %q", response.ThreadID) + } + + // Verify threadId was set to session ID + if receivedBody["threadId"] != "test-session" { + t.Errorf("Expected threadId 'test-session', got %v", receivedBody["threadId"]) + } + // Verify runId was generated + if receivedBody["runId"] == nil || receivedBody["runId"] == "" { + t.Error("Expected runId to be generated") + } +} + +func TestE2E_SendMessage_EmptyContent(t *testing.T) { + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/message", + strings.NewReader(`{"content": ""}`)) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + req.Header.Set("Content-Type", "application/json") + router.ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("Expected status 400 for empty content, got %d: %s", w.Code, w.Body.String()) + } +} diff --git a/components/public-api/handlers/sessions.go b/components/public-api/handlers/sessions.go index 758914e18..137d43a20 100644 --- a/components/public-api/handlers/sessions.go +++ b/components/public-api/handlers/sessions.go @@ -264,6 +264,29 @@ func transformSession(data map[string]interface{}) types.SessionResponse { if model, ok := spec["model"].(string); ok { session.Model = model } + if displayName, ok := spec["displayName"].(string); ok { + session.DisplayName = displayName + } + if repos, ok := spec["repos"].([]interface{}); ok { + for _, r := range repos { + repo, ok := r.(map[string]interface{}) + if !ok { + continue + } + sr := types.SessionRepo{} + if input, ok := repo["input"].(map[string]interface{}); ok { + if url, ok := input["url"].(string); ok { + sr.URL = url + } + if branch, ok := input["branch"].(string); ok { + sr.Branch = branch + } + } + if sr.URL != "" { + session.Repos = append(session.Repos, sr) + } + } + } } // Extract status diff --git a/components/public-api/handlers/sessions_test.go b/components/public-api/handlers/sessions_test.go index 43e7ff920..4e8466c53 100644 --- a/components/public-api/handlers/sessions_test.go +++ b/components/public-api/handlers/sessions_test.go @@ -104,6 +104,40 @@ func TestTransformSession(t *testing.T) { Task: "List item task", }, }, + { + name: "Session with displayName and repos", + input: map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "session-with-repos", + "creationTimestamp": "2026-01-29T10:00:00Z", + }, + "spec": map[string]interface{}{ + "prompt": "Fix the bug", + "displayName": "My Cool Session", + "repos": []interface{}{ + map[string]interface{}{ + "input": map[string]interface{}{ + "url": "https://github.com/org/repo", + "branch": "main", + }, + }, + }, + }, + "status": map[string]interface{}{ + "phase": "Running", + }, + }, + expected: types.SessionResponse{ + ID: "session-with-repos", + Status: "running", + DisplayName: "My Cool Session", + Task: "Fix the bug", + Repos: []types.SessionRepo{ + {URL: "https://github.com/org/repo", Branch: "main"}, + }, + CreatedAt: "2026-01-29T10:00:00Z", + }, + }, { name: "Empty session", input: map[string]interface{}{}, @@ -141,6 +175,21 @@ func TestTransformSession(t *testing.T) { if result.Error != tt.expected.Error { t.Errorf("Error = %q, want %q", result.Error, tt.expected.Error) } + if result.DisplayName != tt.expected.DisplayName { + t.Errorf("DisplayName = %q, want %q", result.DisplayName, tt.expected.DisplayName) + } + if len(result.Repos) != len(tt.expected.Repos) { + t.Errorf("Repos count = %d, want %d", len(result.Repos), len(tt.expected.Repos)) + } else { + for i, r := range result.Repos { + if r.URL != tt.expected.Repos[i].URL { + t.Errorf("Repos[%d].URL = %q, want %q", i, r.URL, tt.expected.Repos[i].URL) + } + if r.Branch != tt.expected.Repos[i].Branch { + t.Errorf("Repos[%d].Branch = %q, want %q", i, r.Branch, tt.expected.Repos[i].Branch) + } + } + } }) } } diff --git a/components/public-api/main.go b/components/public-api/main.go index fb20ac418..1aeb39938 100644 --- a/components/public-api/main.go +++ b/components/public-api/main.go @@ -96,6 +96,21 @@ func main() { v1.POST("/sessions", handlers.CreateSession) v1.GET("/sessions/:id", handlers.GetSession) v1.DELETE("/sessions/:id", handlers.DeleteSession) + + // Runs + v1.POST("/sessions/:id/runs", handlers.CreateRun) + v1.GET("/sessions/:id/runs", handlers.GetSessionRuns) + + // Messaging + v1.POST("/sessions/:id/message", handlers.SendMessage) + + // Output + v1.GET("/sessions/:id/output", handlers.GetSessionOutput) + + // Lifecycle + v1.POST("/sessions/:id/start", handlers.StartSession) + v1.POST("/sessions/:id/stop", handlers.StopSession) + v1.POST("/sessions/:id/interrupt", handlers.InterruptSession) } // Get port from environment or default to 8081 @@ -143,8 +158,8 @@ func getAllowedOrigins() []string { // Default: allow common development origins return []string{ - "http://localhost:3000", // Next.js dev server - "http://localhost:8080", // Frontend in kind + "http://localhost:3000", // Next.js dev server + "http://localhost:8080", // Frontend in kind "https://*.apps-crc.testing", // CRC routes } } diff --git a/components/public-api/types/dto.go b/components/public-api/types/dto.go index 9faafa8d1..d6474758c 100644 --- a/components/public-api/types/dto.go +++ b/components/public-api/types/dto.go @@ -2,14 +2,16 @@ package types // SessionResponse is the simplified session response for the public API type SessionResponse struct { - ID string `json:"id"` - Status string `json:"status"` // "pending", "running", "completed", "failed" - Task string `json:"task"` - Model string `json:"model,omitempty"` - CreatedAt string `json:"createdAt"` - CompletedAt string `json:"completedAt,omitempty"` - Result string `json:"result,omitempty"` - Error string `json:"error,omitempty"` + ID string `json:"id"` + Status string `json:"status"` // "pending", "running", "completed", "failed" + DisplayName string `json:"display_name,omitempty"` + Task string `json:"task"` + Model string `json:"model,omitempty"` + Repos []SessionRepo `json:"repos,omitempty"` + CreatedAt string `json:"createdAt"` + CompletedAt string `json:"completedAt,omitempty"` + Result string `json:"result,omitempty"` + Error string `json:"error,omitempty"` } // SessionListResponse is the response for listing sessions @@ -31,6 +33,99 @@ type Repo struct { Branch string `json:"branch,omitempty"` } +// CreateRunRequest is the request body for creating an AG-UI run +type CreateRunRequest struct { + RunID string `json:"run_id,omitempty"` + ThreadID string `json:"thread_id,omitempty"` + Messages []RunMessage `json:"messages" binding:"required,min=1"` +} + +// RunMessage is a message in a run request +type RunMessage struct { + ID string `json:"id,omitempty"` + Role string `json:"role" binding:"required"` + Content string `json:"content" binding:"required"` +} + +// CreateRunResponse is the response for creating a run +type CreateRunResponse struct { + RunID string `json:"run_id"` + ThreadID string `json:"thread_id"` +} + +// SessionRunsResponse is the response for listing runs in a session +type SessionRunsResponse struct { + SessionID string `json:"session_id"` + Runs []RunSummary `json:"runs"` +} + +// RunSummary is a summary of a single run +type RunSummary struct { + RunID string `json:"run_id"` + StartedAt int64 `json:"started_at,omitempty"` + FinishedAt int64 `json:"finished_at,omitempty"` + Status string `json:"status"` + UserMessage string `json:"user_message,omitempty"` + EventCount int `json:"event_count"` +} + +// SendMessageRequest is the request body for sending a message to a session +type SendMessageRequest struct { + Content string `json:"content" binding:"required"` +} + +// SendMessageResponse is the response for sending a message +type SendMessageResponse struct { + RunID string `json:"run_id"` + ThreadID string `json:"thread_id"` +} + +// TranscriptOutputResponse is the response for transcript format output +type TranscriptOutputResponse struct { + SessionID string `json:"session_id"` + Format string `json:"format"` + Messages []TranscriptMessage `json:"messages"` +} + +// TranscriptMessage is a single message in transcript output +type TranscriptMessage struct { + ID string `json:"id,omitempty"` + Role string `json:"role"` + Content string `json:"content,omitempty"` + ToolCalls []TranscriptToolCall `json:"tool_calls,omitempty"` + ToolCallID string `json:"tool_call_id,omitempty"` + Name string `json:"name,omitempty"` + Timestamp string `json:"timestamp,omitempty"` +} + +// TranscriptToolCall is a tool call in a transcript message +type TranscriptToolCall struct { + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Args string `json:"args,omitempty"` + Result string `json:"result,omitempty"` + Status string `json:"status,omitempty"` + Duration int64 `json:"duration,omitempty"` +} + +// EventsOutputResponse is the response for events/compact format output +type EventsOutputResponse struct { + SessionID string `json:"session_id"` + Format string `json:"format"` + Events []map[string]interface{} `json:"events"` +} + +// SessionRepo represents a repository configured for a session +type SessionRepo struct { + URL string `json:"url"` + Branch string `json:"branch,omitempty"` +} + +// MessageResponse is a simple message response +type MessageResponse struct { + Message string `json:"message"` +} + // ErrorResponse is a standard error response type ErrorResponse struct { Error string `json:"error"` From 598b2b57f709c85376dc3ed2d9e951aeedd12ae9 Mon Sep 17 00:00:00 2001 From: Tim Williams Date: Mon, 9 Mar 2026 10:50:29 -0400 Subject: [PATCH 3/8] fix(public-api): harden error handling, validation, and phase guards - Filter forwarded errors to only expose the "error" field, preventing internal detail leakage (namespaces, K8s traces) - Add phase guards to lifecycle and messaging endpoints (defense-in-depth) - Standardize JSON casing: snake_case in public API, camelCase to backend - Lowercase unknown phases in normalizePhase to maintain API contract - Validate run_id as UUID, format as enum, session IDs as K8s names - Stable sort for run summaries by insertion order --- components/public-api/handlers/lifecycle.go | 20 ++++ components/public-api/handlers/output.go | 14 +-- components/public-api/handlers/proxy.go | 1 + components/public-api/handlers/runs.go | 58 +++++++--- components/public-api/handlers/runs_test.go | 103 +++++++++++++++++- components/public-api/handlers/sessions.go | 22 ++-- .../public-api/handlers/sessions_test.go | 63 ++++++++++- .../public-api/observability/logging.go | 1 + components/public-api/types/dto.go | 11 +- .../proposals/public-api-session-control.md | 2 +- 10 files changed, 250 insertions(+), 45 deletions(-) diff --git a/components/public-api/handlers/lifecycle.go b/components/public-api/handlers/lifecycle.go index d8bd32b5c..ad41f8880 100644 --- a/components/public-api/handlers/lifecycle.go +++ b/components/public-api/handlers/lifecycle.go @@ -13,6 +13,10 @@ import ( ) // StartSession handles POST /v1/sessions/:id/start +// +// Defense-in-depth: The gateway fetches the session phase before forwarding. +// The backend also validates phase transitions, so this is a redundant guard +// that provides faster feedback and reduces unnecessary backend writes. func StartSession(c *gin.Context) { project := GetProject(c) if !ValidateProjectName(project) { @@ -30,6 +34,12 @@ func StartSession(c *gin.Context) { return // getSessionPhase already wrote the error response } + if phase == "" { + log.Printf("Session %s has no phase, treating as unknown", sessionID) + c.JSON(http.StatusConflict, gin.H{"error": "Session state is unknown"}) + return + } + if phase == "running" || phase == "pending" { c.JSON(http.StatusUnprocessableEntity, gin.H{"error": "Session is already running or pending"}) return @@ -67,6 +77,10 @@ func StartSession(c *gin.Context) { } // StopSession handles POST /v1/sessions/:id/stop +// +// Defense-in-depth: The gateway fetches the session phase before forwarding. +// The backend also validates phase transitions, so this is a redundant guard +// that provides faster feedback and reduces unnecessary backend writes. func StopSession(c *gin.Context) { project := GetProject(c) if !ValidateProjectName(project) { @@ -84,6 +98,12 @@ func StopSession(c *gin.Context) { return } + if phase == "" { + log.Printf("Session %s has no phase, treating as unknown", sessionID) + c.JSON(http.StatusConflict, gin.H{"error": "Session state is unknown"}) + return + } + if phase == "completed" || phase == "failed" { c.JSON(http.StatusUnprocessableEntity, gin.H{"error": "Session is not in a running state"}) return diff --git a/components/public-api/handlers/output.go b/components/public-api/handlers/output.go index 27ab301e3..647c92f5c 100644 --- a/components/public-api/handlers/output.go +++ b/components/public-api/handlers/output.go @@ -3,15 +3,13 @@ package handlers import ( "fmt" "net/http" - "regexp" "ambient-code-public-api/types" "github.com/gin-gonic/gin" + "github.com/google/uuid" ) -var uuidRegex = regexp.MustCompile(`^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$`) - // GetSessionOutput handles GET /v1/sessions/:id/output func GetSessionOutput(c *gin.Context) { project := GetProject(c) @@ -32,15 +30,17 @@ func GetSessionOutput(c *gin.Context) { } runIDFilter := c.Query("run_id") - if runIDFilter != "" && !uuidRegex.MatchString(runIDFilter) { - c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid run_id format, must be a valid UUID"}) - return + if runIDFilter != "" { + if _, err := uuid.Parse(runIDFilter); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid run_id format, must be a valid UUID"}) + return + } } events, statusCode, err := fetchSessionEvents(c, project, sessionID) if err != nil { if statusCode > 0 { - c.JSON(statusCode, gin.H{"error": err.Error()}) + c.JSON(statusCode, gin.H{"error": "Request failed"}) } else { c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) } diff --git a/components/public-api/handlers/proxy.go b/components/public-api/handlers/proxy.go index b4e8c6ad7..cf6b58f2d 100644 --- a/components/public-api/handlers/proxy.go +++ b/components/public-api/handlers/proxy.go @@ -1,3 +1,4 @@ +// Package handlers implements HTTP request handlers for the public API gateway. package handlers import ( diff --git a/components/public-api/handlers/runs.go b/components/public-api/handlers/runs.go index a917a5a71..63a5fcfc8 100644 --- a/components/public-api/handlers/runs.go +++ b/components/public-api/handlers/runs.go @@ -6,6 +6,7 @@ import ( "io" "log" "net/http" + "slices" "ambient-code-public-api/types" @@ -14,6 +15,10 @@ import ( ) // CreateRun handles POST /v1/sessions/:id/runs +// +// Defense-in-depth: The gateway fetches the session phase before forwarding. +// The backend also validates phase transitions, so this is a redundant guard +// that provides faster feedback and reduces unnecessary backend writes. func CreateRun(c *gin.Context) { project := GetProject(c) if !ValidateProjectName(project) { @@ -26,9 +31,19 @@ func CreateRun(c *gin.Context) { return } + phase, err := getSessionPhase(c, project, sessionID) + if err != nil { + return // getSessionPhase already wrote the error response + } + + if phase != "running" { + c.JSON(http.StatusUnprocessableEntity, gin.H{"error": "Session is not in a running state"}) + return + } + var req types.CreateRunRequest if err := c.ShouldBindJSON(&req); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"}) return } @@ -114,6 +129,10 @@ func CreateRun(c *gin.Context) { } // SendMessage handles POST /v1/sessions/:id/message +// +// Defense-in-depth: The gateway fetches the session phase before forwarding. +// The backend also validates phase transitions, so this is a redundant guard +// that provides faster feedback and reduces unnecessary backend writes. func SendMessage(c *gin.Context) { project := GetProject(c) if !ValidateProjectName(project) { @@ -126,9 +145,19 @@ func SendMessage(c *gin.Context) { return } + phase, err := getSessionPhase(c, project, sessionID) + if err != nil { + return // getSessionPhase already wrote the error response + } + + if phase != "running" { + c.JSON(http.StatusUnprocessableEntity, gin.H{"error": "Session is not in a running state"}) + return + } + var req types.SendMessageRequest if err := c.ShouldBindJSON(&req); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"}) return } @@ -216,7 +245,7 @@ func GetSessionRuns(c *gin.Context) { events, statusCode, err := fetchSessionEvents(c, project, sessionID) if err != nil { if statusCode > 0 { - c.JSON(statusCode, gin.H{"error": err.Error()}) + c.JSON(statusCode, gin.H{"error": "Request failed"}) } else { c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) } @@ -239,14 +268,14 @@ func fetchSessionEvents(c *gin.Context, project, sessionID string) ([]map[string resp, err := ProxyRequest(c, http.MethodGet, path, nil) if err != nil { log.Printf("Backend request failed for export: %v", err) - return nil, 0, fmt.Errorf("Backend unavailable") + return nil, 0, fmt.Errorf("backend unavailable") } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { log.Printf("Failed to read export response: %v", err) - return nil, http.StatusInternalServerError, fmt.Errorf("Internal server error") + return nil, http.StatusInternalServerError, fmt.Errorf("internal server error") } if resp.StatusCode != http.StatusOK { @@ -257,7 +286,7 @@ func fetchSessionEvents(c *gin.Context, project, sessionID string) ([]map[string return nil, resp.StatusCode, fmt.Errorf("%s", errMsg) } } - return nil, resp.StatusCode, fmt.Errorf("Request failed") + return nil, resp.StatusCode, fmt.Errorf("request failed") } var exportResp struct { @@ -265,13 +294,13 @@ func fetchSessionEvents(c *gin.Context, project, sessionID string) ([]map[string } if err := json.Unmarshal(body, &exportResp); err != nil { log.Printf("Failed to parse export response: %v", err) - return nil, http.StatusInternalServerError, fmt.Errorf("Internal server error") + return nil, http.StatusInternalServerError, fmt.Errorf("internal server error") } var events []map[string]interface{} if err := json.Unmarshal(exportResp.AGUIEvents, &events); err != nil { log.Printf("Failed to parse aguiEvents: %v", err) - return nil, http.StatusInternalServerError, fmt.Errorf("Internal server error") + return nil, http.StatusInternalServerError, fmt.Errorf("internal server error") } return events, 0, nil @@ -336,19 +365,14 @@ func deriveRunSummaries(events []map[string]interface{}) []types.RunSummary { } // Build sorted slice - runs := make([]types.RunSummary, 0, len(runMap)) sorted := make([]*runData, 0, len(runMap)) for _, rd := range runMap { sorted = append(sorted, rd) } - // Sort by insertion order - for i := 0; i < len(sorted); i++ { - for j := i + 1; j < len(sorted); j++ { - if sorted[j].order < sorted[i].order { - sorted[i], sorted[j] = sorted[j], sorted[i] - } - } - } + slices.SortFunc(sorted, func(a, b *runData) int { + return a.order - b.order + }) + runs := make([]types.RunSummary, 0, len(sorted)) for _, rd := range sorted { runs = append(runs, rd.summary) } diff --git a/components/public-api/handlers/runs_test.go b/components/public-api/handlers/runs_test.go index c1a5ad8d1..4d205d2fe 100644 --- a/components/public-api/handlers/runs_test.go +++ b/components/public-api/handlers/runs_test.go @@ -117,6 +117,18 @@ func TestToInt64(t *testing.T) { func TestE2E_CreateRun(t *testing.T) { var receivedBody map[string]interface{} backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + // GET session (for phase check) + if r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/agentic-sessions/") { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]interface{}{ + "metadata": map[string]interface{}{"name": "test-session"}, + "status": map[string]interface{}{"phase": "Running"}, + }) + return + } + if r.Method != http.MethodPost { t.Errorf("Expected POST, got %s", r.Method) } @@ -127,7 +139,6 @@ func TestE2E_CreateRun(t *testing.T) { decoder := json.NewDecoder(r.Body) decoder.Decode(&receivedBody) - w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(map[string]string{ "runId": "test-run-id", @@ -168,7 +179,26 @@ func TestE2E_CreateRun(t *testing.T) { } } +func makeRunningSessionBackend(t *testing.T) *httptest.Server { + t.Helper() + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]interface{}{ + "metadata": map[string]interface{}{"name": "test-session"}, + "status": map[string]interface{}{"phase": "Running"}, + }) + })) +} + func TestE2E_CreateRun_InvalidBody(t *testing.T) { + backend := makeRunningSessionBackend(t) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + router := setupTestRouter() w := httptest.NewRecorder() req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/runs", @@ -184,6 +214,13 @@ func TestE2E_CreateRun_InvalidBody(t *testing.T) { } func TestE2E_CreateRun_NoMessages(t *testing.T) { + backend := makeRunningSessionBackend(t) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + router := setupTestRouter() w := httptest.NewRecorder() req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/runs", @@ -198,6 +235,28 @@ func TestE2E_CreateRun_NoMessages(t *testing.T) { } } +func TestE2E_CreateRun_SessionNotRunning(t *testing.T) { + backend := makeSessionBackend(t, "Completed", "/agui/run", http.StatusOK) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/runs", + strings.NewReader(`{"messages": [{"role": "user", "content": "Hello"}]}`)) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + req.Header.Set("Content-Type", "application/json") + router.ServeHTTP(w, req) + + if w.Code != http.StatusUnprocessableEntity { + t.Errorf("Expected status 422 for completed session, got %d: %s", w.Code, w.Body.String()) + } +} + func TestE2E_GetSessionRuns(t *testing.T) { backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if !strings.Contains(r.URL.Path, "/export") { @@ -278,10 +337,21 @@ func TestE2E_GetSessionRuns_BackendError(t *testing.T) { func TestE2E_SendMessage(t *testing.T) { var receivedBody map[string]interface{} backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + // GET session (for phase check) + if r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/agentic-sessions/") { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]interface{}{ + "metadata": map[string]interface{}{"name": "test-session"}, + "status": map[string]interface{}{"phase": "Running"}, + }) + return + } + decoder := json.NewDecoder(r.Body) decoder.Decode(&receivedBody) - w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(map[string]string{ "runId": "generated-run-id", @@ -327,6 +397,13 @@ func TestE2E_SendMessage(t *testing.T) { } func TestE2E_SendMessage_EmptyContent(t *testing.T) { + backend := makeRunningSessionBackend(t) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + router := setupTestRouter() w := httptest.NewRecorder() req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/message", @@ -340,3 +417,25 @@ func TestE2E_SendMessage_EmptyContent(t *testing.T) { t.Errorf("Expected status 400 for empty content, got %d: %s", w.Code, w.Body.String()) } } + +func TestE2E_SendMessage_SessionNotRunning(t *testing.T) { + backend := makeSessionBackend(t, "Failed", "/agui/run", http.StatusOK) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions/test-session/message", + strings.NewReader(`{"content": "Hello"}`)) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + req.Header.Set("Content-Type", "application/json") + router.ServeHTTP(w, req) + + if w.Code != http.StatusUnprocessableEntity { + t.Errorf("Expected status 422 for failed session, got %d: %s", w.Code, w.Body.String()) + } +} diff --git a/components/public-api/handlers/sessions.go b/components/public-api/handlers/sessions.go index 137d43a20..f2b80ad88 100644 --- a/components/public-api/handlers/sessions.go +++ b/components/public-api/handlers/sessions.go @@ -6,6 +6,7 @@ import ( "io" "log" "net/http" + "strings" "ambient-code-public-api/types" @@ -117,7 +118,7 @@ func CreateSession(c *gin.Context) { var req types.CreateSessionRequest if err := c.ShouldBindJSON(&req); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"}) return } @@ -221,17 +222,19 @@ func DeleteSession(c *gin.Context) { forwardErrorResponse(c, resp.StatusCode, body) } -// forwardErrorResponse forwards backend error with consistent JSON format +// forwardErrorResponse forwards backend error with consistent JSON format. +// SECURITY: Only forwards the "error" field to prevent leaking internal details. func forwardErrorResponse(c *gin.Context, statusCode int, body []byte) { - // Try to parse as JSON error response + // Try to parse as JSON and extract only the "error" field var errorResp map[string]interface{} if err := json.Unmarshal(body, &errorResp); err == nil { - // Backend returned valid JSON, forward it - c.JSON(statusCode, errorResp) - return + if errMsg, ok := errorResp["error"].(string); ok { + c.JSON(statusCode, gin.H{"error": errMsg}) + return + } } - // Backend returned non-JSON, wrap in standard error format + // Backend returned non-JSON or no "error" field, wrap in standard error format c.JSON(statusCode, gin.H{"error": "Request failed"}) } @@ -313,7 +316,8 @@ func transformSession(data map[string]interface{}) types.SessionResponse { return session } -// normalizePhase converts K8s phase to simplified status +// normalizePhase converts K8s phase to simplified lowercase status. +// The public API contract guarantees status values are always lowercase. func normalizePhase(phase string) string { switch phase { case "Pending", "Creating", "Initializing": @@ -325,6 +329,6 @@ func normalizePhase(phase string) string { case "Failed", "Error": return "failed" default: - return phase + return strings.ToLower(phase) } } diff --git a/components/public-api/handlers/sessions_test.go b/components/public-api/handlers/sessions_test.go index 4e8466c53..b0fd8effd 100644 --- a/components/public-api/handlers/sessions_test.go +++ b/components/public-api/handlers/sessions_test.go @@ -1,6 +1,7 @@ package handlers import ( + "encoding/json" "net/http/httptest" "testing" @@ -208,7 +209,8 @@ func TestNormalizePhase(t *testing.T) { {"Succeeded", "completed"}, {"Failed", "failed"}, {"Error", "failed"}, - {"Unknown", "Unknown"}, + {"Unknown", "unknown"}, + {"Stopping", "stopping"}, } for _, tt := range tests { @@ -266,6 +268,20 @@ func TestForwardErrorResponse(t *testing.T) { expectedStatus: 500, expectJSON: true, // Should be wrapped in generic JSON }, + { + name: "Backend returns JSON with extra internal fields", + statusCode: 500, + body: []byte(`{"error": "Session failed", "internal_trace": "k8s.io/xyz:123", "namespace": "secret-ns"}`), + expectedStatus: 500, + expectJSON: true, // Should only forward "error" field + }, + { + name: "Backend returns JSON without error field", + statusCode: 500, + body: []byte(`{"message": "some internal detail"}`), + expectedStatus: 500, + expectJSON: true, // Should fall back to generic error + }, } for _, tt := range tests { @@ -290,6 +306,51 @@ func TestForwardErrorResponse(t *testing.T) { } } +func TestForwardErrorResponse_FiltersInternalFields(t *testing.T) { + gin.SetMode(gin.TestMode) + + // Backend returns JSON with extra internal fields that should be stripped + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/", nil) + + forwardErrorResponse(c, 500, []byte(`{"error": "Session failed", "internal_trace": "k8s.io/xyz:123", "namespace": "secret-ns"}`)) + + var response map[string]interface{} + json.Unmarshal(w.Body.Bytes(), &response) + + if response["error"] != "Session failed" { + t.Errorf("Expected error 'Session failed', got %v", response["error"]) + } + if _, exists := response["internal_trace"]; exists { + t.Error("Expected internal_trace to be stripped from response") + } + if _, exists := response["namespace"]; exists { + t.Error("Expected namespace to be stripped from response") + } +} + +func TestForwardErrorResponse_NoErrorField(t *testing.T) { + gin.SetMode(gin.TestMode) + + // Backend returns JSON without an "error" field + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/", nil) + + forwardErrorResponse(c, 500, []byte(`{"message": "some internal detail"}`)) + + var response map[string]interface{} + json.Unmarshal(w.Body.Bytes(), &response) + + if response["error"] != "Request failed" { + t.Errorf("Expected generic error 'Request failed', got %v", response["error"]) + } + if _, exists := response["message"]; exists { + t.Error("Expected message to not be forwarded") + } +} + func TestTransformSession_TypeSafety(t *testing.T) { // Test that transformSession handles incorrect types gracefully tests := []struct { diff --git a/components/public-api/observability/logging.go b/components/public-api/observability/logging.go index 03acda5f5..d424c5af1 100644 --- a/components/public-api/observability/logging.go +++ b/components/public-api/observability/logging.go @@ -1,3 +1,4 @@ +// Package observability provides structured logging and tracing for the public API. package observability import ( diff --git a/components/public-api/types/dto.go b/components/public-api/types/dto.go index d6474758c..c73498546 100644 --- a/components/public-api/types/dto.go +++ b/components/public-api/types/dto.go @@ -1,3 +1,4 @@ +// Package types defines data transfer objects for the public API. package types // SessionResponse is the simplified session response for the public API @@ -8,8 +9,8 @@ type SessionResponse struct { Task string `json:"task"` Model string `json:"model,omitempty"` Repos []SessionRepo `json:"repos,omitempty"` - CreatedAt string `json:"createdAt"` - CompletedAt string `json:"completedAt,omitempty"` + CreatedAt string `json:"created_at"` + CompletedAt string `json:"completed_at,omitempty"` Result string `json:"result,omitempty"` Error string `json:"error,omitempty"` } @@ -125,9 +126,3 @@ type SessionRepo struct { type MessageResponse struct { Message string `json:"message"` } - -// ErrorResponse is a standard error response -type ErrorResponse struct { - Error string `json:"error"` - Message string `json:"message,omitempty"` -} diff --git a/docs/internal/proposals/public-api-session-control.md b/docs/internal/proposals/public-api-session-control.md index cc2da5e20..d7dd1f398 100644 --- a/docs/internal/proposals/public-api-session-control.md +++ b/docs/internal/proposals/public-api-session-control.md @@ -34,8 +34,8 @@ The public API currently exposes only basic CRUD: | Method | Endpoint | Description | |--------|----------|-------------| -| POST | `/v1/sessions/{id}/message` | **Simplified send.** Accepts `{"content": "..."}`, constructs the AG-UI `RunAgentInput` envelope server-side (generates run ID, thread ID, message ID). Proxies to backend `/agui/run`. | | POST | `/v1/sessions/{id}/runs` | **Raw AG-UI run.** Accepts full `RunAgentInput` with caller-provided run/thread IDs and messages array. Direct proxy to backend `/agui/run`. | +| POST | `/v1/sessions/{id}/message` | **Simplified send.** Accepts `{"content": "..."}`, constructs the AG-UI `RunAgentInput` envelope server-side (generates run ID, thread ID, message ID). Proxies to backend `/agui/run`. | ### Output Retrieval From 034437828eb0e5dbc803aeaabb9c3e53766b4dfb Mon Sep 17 00:00:00 2001 From: Tim Williams Date: Mon, 9 Mar 2026 10:55:13 -0400 Subject: [PATCH 4/8] docs(public-api): fix OpenAPI spec to match implementation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix JSON casing: createdAt/completedAt → created_at/completed_at - Add 422 responses to POST /message and POST /runs (session not running) - Add 409 responses to POST /start and POST /stop (unknown phase) - Remove phantom 422 from POST /interrupt (no phase guard in code) - Fix start/stop descriptions referencing wrong status code (409 → 422) - Remove unused message field from ErrorResponse schema --- components/public-api/openapi.yaml | 49 +++++++++++++++++++----------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/components/public-api/openapi.yaml b/components/public-api/openapi.yaml index 3de09be21..c963df895 100644 --- a/components/public-api/openapi.yaml +++ b/components/public-api/openapi.yaml @@ -136,6 +136,12 @@ paths: $ref: '#/components/responses/BadRequest' '401': $ref: '#/components/responses/Unauthorized' + '422': + description: Session is not in a running state + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' '502': $ref: '#/components/responses/BadGateway' @@ -239,6 +245,12 @@ paths: $ref: '#/components/responses/BadRequest' '401': $ref: '#/components/responses/Unauthorized' + '422': + description: Session is not in a running state + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' '502': $ref: '#/components/responses/BadGateway' @@ -272,8 +284,9 @@ paths: summary: Start (resume) a session description: > Resumes a stopped or completed session. The session transitions - back to Running state and can accept new messages. Returns 409 - if the session is already running or pending. + back to Running state and can accept new messages. Returns 422 + if the session is already running or pending, or 409 if the + session state is unknown. operationId: startSession parameters: - $ref: '#/components/parameters/ProjectHeader' @@ -291,6 +304,12 @@ paths: $ref: '#/components/responses/Unauthorized' '404': $ref: '#/components/responses/NotFound' + '409': + description: Session state is unknown + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' '422': description: Session is already running or pending content: @@ -307,8 +326,9 @@ paths: Stops a running session. The session's pod is terminated and the session transitions to a completed state. This is a session-level lifecycle action — use interrupt instead to cancel only the - current run without killing the session. Returns 409 if the - session is already stopped, completed, or failed. + current run without killing the session. Returns 422 if the + session is already completed or failed, or 409 if the session + state is unknown. operationId: stopSession parameters: - $ref: '#/components/parameters/ProjectHeader' @@ -326,6 +346,12 @@ paths: $ref: '#/components/responses/Unauthorized' '404': $ref: '#/components/responses/NotFound' + '409': + description: Session state is unknown + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' '422': description: Session is not in a running state content: @@ -360,12 +386,6 @@ paths: $ref: '#/components/responses/Unauthorized' '404': $ref: '#/components/responses/NotFound' - '422': - description: Session not in a state that can be interrupted - content: - application/json: - schema: - $ref: '#/components/schemas/ErrorResponse' '502': $ref: '#/components/responses/BadGateway' @@ -417,10 +437,10 @@ components: description: Repositories configured for this session items: $ref: '#/components/schemas/SessionRepo' - createdAt: + created_at: type: string format: date-time - completedAt: + completed_at: type: string format: date-time result: @@ -445,9 +465,6 @@ components: task: type: string description: The initial prompt / task for the agent - display_name: - type: string - description: Human-readable name for the session model: type: string repos: @@ -727,8 +744,6 @@ components: properties: error: type: string - message: - type: string responses: BadRequest: From 6f332f89e385a38d7d679942c2b1f9bca84e80d4 Mon Sep 17 00:00:00 2001 From: Tim Williams Date: Mon, 9 Mar 2026 10:55:55 -0400 Subject: [PATCH 5/8] feat(public-api): support display_name in session creation Add display_name to CreateSessionRequest DTO and forward it as displayName (camelCase) to the backend. Update OpenAPI spec to document the field. --- .../public-api/handlers/integration_test.go | 35 +++++++++++++++++++ components/public-api/handlers/sessions.go | 3 ++ components/public-api/openapi.yaml | 3 ++ components/public-api/types/dto.go | 7 ++-- 4 files changed, 45 insertions(+), 3 deletions(-) diff --git a/components/public-api/handlers/integration_test.go b/components/public-api/handlers/integration_test.go index 73d30af4a..6279258bf 100644 --- a/components/public-api/handlers/integration_test.go +++ b/components/public-api/handlers/integration_test.go @@ -122,6 +122,41 @@ func TestE2E_CreateSession(t *testing.T) { } } +func TestE2E_CreateSession_WithDisplayName(t *testing.T) { + var receivedBody map[string]interface{} + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + decoder := json.NewDecoder(r.Body) + decoder.Decode(&receivedBody) + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + json.NewEncoder(w).Encode(map[string]string{"name": "session-123"}) + })) + defer backend.Close() + + originalURL := BackendURL + BackendURL = backend.URL + defer func() { BackendURL = originalURL }() + + router := setupTestRouter() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/sessions", + strings.NewReader(`{"task": "Fix the bug", "display_name": "Bug Fix Session"}`)) + req.Header.Set("Authorization", "Bearer test-token") + req.Header.Set("X-Ambient-Project", "test-project") + req.Header.Set("Content-Type", "application/json") + router.ServeHTTP(w, req) + + if w.Code != http.StatusCreated { + t.Errorf("Expected status 201, got %d: %s", w.Code, w.Body.String()) + } + + // Verify display_name was forwarded as displayName (camelCase) to backend + if receivedBody["displayName"] != "Bug Fix Session" { + t.Errorf("Expected displayName 'Bug Fix Session' in backend request, got %v", receivedBody["displayName"]) + } +} + func TestE2E_BackendReturns500(t *testing.T) { backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") diff --git a/components/public-api/handlers/sessions.go b/components/public-api/handlers/sessions.go index f2b80ad88..fe0dbc9e3 100644 --- a/components/public-api/handlers/sessions.go +++ b/components/public-api/handlers/sessions.go @@ -126,6 +126,9 @@ func CreateSession(c *gin.Context) { backendReq := map[string]interface{}{ "prompt": req.Task, } + if req.DisplayName != "" { + backendReq["displayName"] = req.DisplayName + } if req.Model != "" { backendReq["model"] = req.Model } diff --git a/components/public-api/openapi.yaml b/components/public-api/openapi.yaml index c963df895..4780dc7f3 100644 --- a/components/public-api/openapi.yaml +++ b/components/public-api/openapi.yaml @@ -465,6 +465,9 @@ components: task: type: string description: The initial prompt / task for the agent + display_name: + type: string + description: Human-readable name for the session model: type: string repos: diff --git a/components/public-api/types/dto.go b/components/public-api/types/dto.go index c73498546..24b055471 100644 --- a/components/public-api/types/dto.go +++ b/components/public-api/types/dto.go @@ -23,9 +23,10 @@ type SessionListResponse struct { // CreateSessionRequest is the request body for creating a session type CreateSessionRequest struct { - Task string `json:"task" binding:"required"` - Model string `json:"model,omitempty"` - Repos []Repo `json:"repos,omitempty"` + Task string `json:"task" binding:"required"` + DisplayName string `json:"display_name,omitempty"` + Model string `json:"model,omitempty"` + Repos []Repo `json:"repos,omitempty"` } // Repo represents a repository configuration From 61e4634fd7e388ceab61923a0de5b8481cb0c754 Mon Sep 17 00:00:00 2001 From: Tim Williams Date: Mon, 9 Mar 2026 13:08:56 -0400 Subject: [PATCH 6/8] fix(public-api): return cancel func from ProxyRequest to prevent context canceled errors ProxyRequest deferred context cancellation before returning the response, causing callers to get "context canceled" when reading large response bodies (e.g. list sessions). Small responses worked by chance because the body was already buffered in the transport layer. Return the cancel function to callers so cancellation happens after the response body has been read. All callers and tests updated accordingly. --- components/public-api/handlers/lifecycle.go | 12 ++++++++---- components/public-api/handlers/proxy.go | 17 ++++++++++------- components/public-api/handlers/proxy_test.go | 3 ++- components/public-api/handlers/runs.go | 9 ++++++--- components/public-api/handlers/sessions.go | 12 ++++++++---- 5 files changed, 34 insertions(+), 19 deletions(-) diff --git a/components/public-api/handlers/lifecycle.go b/components/public-api/handlers/lifecycle.go index ad41f8880..8b774c29e 100644 --- a/components/public-api/handlers/lifecycle.go +++ b/components/public-api/handlers/lifecycle.go @@ -46,12 +46,13 @@ func StartSession(c *gin.Context) { } path := fmt.Sprintf("/api/projects/%s/agentic-sessions/%s/start", project, sessionID) - resp, err := ProxyRequest(c, http.MethodPost, path, nil) + resp, cancel, err := ProxyRequest(c, http.MethodPost, path, nil) if err != nil { log.Printf("Backend request failed for start session %s: %v", sessionID, err) c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) return } + defer cancel() defer resp.Body.Close() body, err := io.ReadAll(resp.Body) @@ -110,12 +111,13 @@ func StopSession(c *gin.Context) { } path := fmt.Sprintf("/api/projects/%s/agentic-sessions/%s/stop", project, sessionID) - resp, err := ProxyRequest(c, http.MethodPost, path, nil) + resp, cancel, err := ProxyRequest(c, http.MethodPost, path, nil) if err != nil { log.Printf("Backend request failed for stop session %s: %v", sessionID, err) c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) return } + defer cancel() defer resp.Body.Close() body, err := io.ReadAll(resp.Body) @@ -154,12 +156,13 @@ func InterruptSession(c *gin.Context) { } path := fmt.Sprintf("/api/projects/%s/agentic-sessions/%s/agui/interrupt", project, sessionID) - resp, err := ProxyRequest(c, http.MethodPost, path, []byte("{}")) + resp, cancel, err := ProxyRequest(c, http.MethodPost, path, []byte("{}")) if err != nil { log.Printf("Backend request failed for interrupt session %s: %v", sessionID, err) c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) return } + defer cancel() defer resp.Body.Close() body, err := io.ReadAll(resp.Body) @@ -181,12 +184,13 @@ func InterruptSession(c *gin.Context) { // On error, it writes the appropriate error response to the gin context. func getSessionPhase(c *gin.Context, project, sessionID string) (string, error) { path := fmt.Sprintf("/api/projects/%s/agentic-sessions/%s", project, sessionID) - resp, err := ProxyRequest(c, http.MethodGet, path, nil) + resp, cancel, err := ProxyRequest(c, http.MethodGet, path, nil) if err != nil { log.Printf("Backend request failed for get session phase %s: %v", sessionID, err) c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) return "", fmt.Errorf("backend unavailable") } + defer cancel() defer resp.Body.Close() body, err := io.ReadAll(resp.Body) diff --git a/components/public-api/handlers/proxy.go b/components/public-api/handlers/proxy.go index cf6b58f2d..23719e939 100644 --- a/components/public-api/handlers/proxy.go +++ b/components/public-api/handlers/proxy.go @@ -43,8 +43,9 @@ func getEnvOrDefault(key, defaultValue string) string { return defaultValue } -// ProxyRequest forwards a request to the backend and returns the response -func ProxyRequest(c *gin.Context, method, path string, body []byte) (*http.Response, error) { +// ProxyRequest forwards a request to the backend and returns the response. +// The caller MUST call the returned context.CancelFunc after reading the response body. +func ProxyRequest(c *gin.Context, method, path string, body []byte) (*http.Response, context.CancelFunc, error) { fullURL := fmt.Sprintf("%s%s", BackendURL, path) var bodyReader io.Reader @@ -55,11 +56,11 @@ func ProxyRequest(c *gin.Context, method, path string, body []byte) (*http.Respo // Create context with explicit timeout (in addition to HTTP client timeout) // This ensures we respect context cancellation from the client ctx, cancel := context.WithTimeout(c.Request.Context(), BackendTimeout) - defer cancel() req, err := http.NewRequestWithContext(ctx, method, fullURL, bodyReader) if err != nil { - return nil, fmt.Errorf("failed to create request: %w", err) + cancel() + return nil, nil, fmt.Errorf("failed to create request: %w", err) } // Forward the token @@ -78,15 +79,16 @@ func ProxyRequest(c *gin.Context, method, path string, body []byte) (*http.Respo resp, err := HTTPClient.Do(req) if err != nil { - return nil, fmt.Errorf("backend request failed: %w", err) + cancel() + return nil, nil, fmt.Errorf("backend request failed: %w", err) } - return resp, nil + return resp, cancel, nil } // ProxyAndRespond proxies a request and writes the response directly func ProxyAndRespond(c *gin.Context, method, path string, body []byte) { - resp, err := ProxyRequest(c, method, path, body) + resp, cancel, err := ProxyRequest(c, method, path, body) if err != nil { // Log detailed error internally, return generic message to user // SECURITY: Never expose internal error details (may contain URLs, tokens, etc.) @@ -94,6 +96,7 @@ func ProxyAndRespond(c *gin.Context, method, path string, body []byte) { c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) return } + defer cancel() defer resp.Body.Close() // Read response body diff --git a/components/public-api/handlers/proxy_test.go b/components/public-api/handlers/proxy_test.go index 95cd54697..45f37f586 100644 --- a/components/public-api/handlers/proxy_test.go +++ b/components/public-api/handlers/proxy_test.go @@ -79,9 +79,10 @@ func TestProxyRequest_BackendUnavailable(t *testing.T) { c.Set(ContextKeyToken, "test-token") c.Set(ContextKeyProject, "test-project") - resp, err := ProxyRequest(c, http.MethodGet, "/api/projects/test/sessions", nil) + resp, cancel, err := ProxyRequest(c, http.MethodGet, "/api/projects/test/sessions", nil) if err == nil { + cancel() resp.Body.Close() t.Error("Expected error for unavailable backend") } diff --git a/components/public-api/handlers/runs.go b/components/public-api/handlers/runs.go index 63a5fcfc8..4523503b0 100644 --- a/components/public-api/handlers/runs.go +++ b/components/public-api/handlers/runs.go @@ -92,12 +92,13 @@ func CreateRun(c *gin.Context) { } path := fmt.Sprintf("/api/projects/%s/agentic-sessions/%s/agui/run", project, sessionID) - resp, err := ProxyRequest(c, http.MethodPost, path, reqBody) + resp, cancel, err := ProxyRequest(c, http.MethodPost, path, reqBody) if err != nil { log.Printf("Backend request failed for create run: %v", err) c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) return } + defer cancel() defer resp.Body.Close() body, err := io.ReadAll(resp.Body) @@ -193,12 +194,13 @@ func SendMessage(c *gin.Context) { } path := fmt.Sprintf("/api/projects/%s/agentic-sessions/%s/agui/run", project, sessionID) - resp, err := ProxyRequest(c, http.MethodPost, path, reqBody) + resp, cancel, err := ProxyRequest(c, http.MethodPost, path, reqBody) if err != nil { log.Printf("Backend request failed for send message: %v", err) c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) return } + defer cancel() defer resp.Body.Close() body, err := io.ReadAll(resp.Body) @@ -265,11 +267,12 @@ func GetSessionRuns(c *gin.Context) { func fetchSessionEvents(c *gin.Context, project, sessionID string) ([]map[string]interface{}, int, error) { path := fmt.Sprintf("/api/projects/%s/agentic-sessions/%s/export", project, sessionID) - resp, err := ProxyRequest(c, http.MethodGet, path, nil) + resp, cancel, err := ProxyRequest(c, http.MethodGet, path, nil) if err != nil { log.Printf("Backend request failed for export: %v", err) return nil, 0, fmt.Errorf("backend unavailable") } + defer cancel() defer resp.Body.Close() body, err := io.ReadAll(resp.Body) diff --git a/components/public-api/handlers/sessions.go b/components/public-api/handlers/sessions.go index fe0dbc9e3..6b0b9a3ef 100644 --- a/components/public-api/handlers/sessions.go +++ b/components/public-api/handlers/sessions.go @@ -22,11 +22,12 @@ func ListSessions(c *gin.Context) { } path := fmt.Sprintf("/api/projects/%s/agentic-sessions", project) - resp, err := ProxyRequest(c, http.MethodGet, path, nil) + resp, cancel, err := ProxyRequest(c, http.MethodGet, path, nil) if err != nil { c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) return } + defer cancel() defer resp.Body.Close() // Read response body @@ -78,12 +79,13 @@ func GetSession(c *gin.Context) { } path := fmt.Sprintf("/api/projects/%s/agentic-sessions/%s", project, sessionID) - resp, err := ProxyRequest(c, http.MethodGet, path, nil) + resp, cancel, err := ProxyRequest(c, http.MethodGet, path, nil) if err != nil { log.Printf("Backend request failed for session %s: %v", sessionID, err) c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) return } + defer cancel() defer resp.Body.Close() body, err := io.ReadAll(resp.Body) @@ -154,12 +156,13 @@ func CreateSession(c *gin.Context) { path := fmt.Sprintf("/api/projects/%s/agentic-sessions", project) - resp, err := ProxyRequest(c, http.MethodPost, path, reqBody) + resp, cancel, err := ProxyRequest(c, http.MethodPost, path, reqBody) if err != nil { log.Printf("Backend request failed for create session: %v", err) c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) return } + defer cancel() defer resp.Body.Close() respBody, err := io.ReadAll(resp.Body) @@ -203,12 +206,13 @@ func DeleteSession(c *gin.Context) { } path := fmt.Sprintf("/api/projects/%s/agentic-sessions/%s", project, sessionID) - resp, err := ProxyRequest(c, http.MethodDelete, path, nil) + resp, cancel, err := ProxyRequest(c, http.MethodDelete, path, nil) if err != nil { log.Printf("Backend request failed for delete session %s: %v", sessionID, err) c.JSON(http.StatusBadGateway, gin.H{"error": "Backend unavailable"}) return } + defer cancel() defer resp.Body.Close() if resp.StatusCode == http.StatusNoContent || resp.StatusCode == http.StatusOK { From 1fa4714ec0f523baab2051b700abc63cbe5bf5d5 Mon Sep 17 00:00:00 2001 From: Tim Williams Date: Mon, 9 Mar 2026 13:22:00 -0400 Subject: [PATCH 7/8] fix(public-api): use correct backend field names for session creation - Send "initialPrompt" instead of "prompt" (backend field name) - Send flat repo objects {url, branch} instead of nested {input: {url, branch}} - Read "initialPrompt" first in transformSession, falling back to "prompt" - Update tests accordingly --- .../public-api/handlers/integration_test.go | 4 +-- components/public-api/handlers/sessions.go | 17 +++++++------ .../public-api/handlers/sessions_test.go | 25 +++++++++++++++++-- 3 files changed, 35 insertions(+), 11 deletions(-) diff --git a/components/public-api/handlers/integration_test.go b/components/public-api/handlers/integration_test.go index 6279258bf..765711341 100644 --- a/components/public-api/handlers/integration_test.go +++ b/components/public-api/handlers/integration_test.go @@ -117,8 +117,8 @@ func TestE2E_CreateSession(t *testing.T) { } // Verify request body was transformed correctly - if !strings.Contains(requestBody, "prompt") { - t.Errorf("Expected request body to contain 'prompt', got %s", requestBody) + if !strings.Contains(requestBody, "initialPrompt") { + t.Errorf("Expected request body to contain 'initialPrompt', got %s", requestBody) } } diff --git a/components/public-api/handlers/sessions.go b/components/public-api/handlers/sessions.go index 6b0b9a3ef..18ab79042 100644 --- a/components/public-api/handlers/sessions.go +++ b/components/public-api/handlers/sessions.go @@ -126,7 +126,7 @@ func CreateSession(c *gin.Context) { // Transform to backend format backendReq := map[string]interface{}{ - "prompt": req.Task, + "initialPrompt": req.Task, } if req.DisplayName != "" { backendReq["displayName"] = req.DisplayName @@ -137,12 +137,13 @@ func CreateSession(c *gin.Context) { if len(req.Repos) > 0 { repos := make([]map[string]interface{}, len(req.Repos)) for i, r := range req.Repos { - repos[i] = map[string]interface{}{ - "input": map[string]interface{}{ - "url": r.URL, - "branch": r.Branch, - }, + repo := map[string]interface{}{ + "url": r.URL, } + if r.Branch != "" { + repo["branch"] = r.Branch + } + repos[i] = repo } backendReq["repos"] = repos } @@ -268,7 +269,9 @@ func transformSession(data map[string]interface{}) types.SessionResponse { // Extract spec if spec, ok := data["spec"].(map[string]interface{}); ok { - if prompt, ok := spec["prompt"].(string); ok { + if prompt, ok := spec["initialPrompt"].(string); ok { + session.Task = prompt + } else if prompt, ok := spec["prompt"].(string); ok { session.Task = prompt } if model, ok := spec["model"].(string); ok { diff --git a/components/public-api/handlers/sessions_test.go b/components/public-api/handlers/sessions_test.go index b0fd8effd..3ea114706 100644 --- a/components/public-api/handlers/sessions_test.go +++ b/components/public-api/handlers/sessions_test.go @@ -24,8 +24,8 @@ func TestTransformSession(t *testing.T) { "creationTimestamp": "2026-01-29T10:00:00Z", }, "spec": map[string]interface{}{ - "prompt": "Fix the bug", - "model": "claude-sonnet-4", + "initialPrompt": "Fix the bug", + "model": "claude-sonnet-4", }, "status": map[string]interface{}{ "phase": "Running", @@ -40,6 +40,27 @@ func TestTransformSession(t *testing.T) { CreatedAt: "2026-01-29T10:00:00Z", }, }, + { + name: "Legacy prompt field fallback", + input: map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "session-legacy", + "creationTimestamp": "2026-01-29T10:00:00Z", + }, + "spec": map[string]interface{}{ + "prompt": "Legacy prompt", + }, + "status": map[string]interface{}{ + "phase": "Running", + }, + }, + expected: types.SessionResponse{ + ID: "session-legacy", + Status: "running", + Task: "Legacy prompt", + CreatedAt: "2026-01-29T10:00:00Z", + }, + }, { name: "Completed session with result", input: map[string]interface{}{ From 9c73c1a26a585267507baf1087317aa9d3ed45aa Mon Sep 17 00:00:00 2001 From: Tim Williams Date: Mon, 9 Mar 2026 22:30:11 -0400 Subject: [PATCH 8/8] feat(public-api): forward activeWorkflow and environmentVariables on session creation The public API gateway was silently dropping activeWorkflow and environmentVariables fields from POST /v1/sessions requests. Add these fields to CreateSessionRequest DTO, forward them in the backend request transformation, and update the OpenAPI spec. --- components/public-api/handlers/sessions.go | 15 +++++++++++++++ components/public-api/openapi.yaml | 22 ++++++++++++++++++++++ components/public-api/types/dto.go | 17 +++++++++++++---- 3 files changed, 50 insertions(+), 4 deletions(-) diff --git a/components/public-api/handlers/sessions.go b/components/public-api/handlers/sessions.go index 18ab79042..b6beea75a 100644 --- a/components/public-api/handlers/sessions.go +++ b/components/public-api/handlers/sessions.go @@ -147,6 +147,21 @@ func CreateSession(c *gin.Context) { } backendReq["repos"] = repos } + if req.ActiveWorkflow != nil && strings.TrimSpace(req.ActiveWorkflow.GitURL) != "" { + wf := map[string]interface{}{ + "gitUrl": req.ActiveWorkflow.GitURL, + } + if req.ActiveWorkflow.Branch != "" { + wf["branch"] = req.ActiveWorkflow.Branch + } + if req.ActiveWorkflow.Path != "" { + wf["path"] = req.ActiveWorkflow.Path + } + backendReq["activeWorkflow"] = wf + } + if len(req.EnvironmentVariables) > 0 { + backendReq["environmentVariables"] = req.EnvironmentVariables + } reqBody, err := json.Marshal(backendReq) if err != nil { diff --git a/components/public-api/openapi.yaml b/components/public-api/openapi.yaml index 4780dc7f3..dd9833d82 100644 --- a/components/public-api/openapi.yaml +++ b/components/public-api/openapi.yaml @@ -480,6 +480,28 @@ components: type: string branch: type: string + activeWorkflow: + $ref: '#/components/schemas/WorkflowRef' + environmentVariables: + type: object + additionalProperties: + type: string + description: Environment variables to inject into the session pod + + WorkflowRef: + type: object + required: [gitUrl] + description: Identifies a workflow by git repository location + properties: + gitUrl: + type: string + description: Git clone URL of the repository containing the workflow + branch: + type: string + description: Branch to check out (defaults to main) + path: + type: string + description: Path within the repository to the workflow directory CreateRunRequest: type: object diff --git a/components/public-api/types/dto.go b/components/public-api/types/dto.go index 24b055471..69b09b599 100644 --- a/components/public-api/types/dto.go +++ b/components/public-api/types/dto.go @@ -23,10 +23,19 @@ type SessionListResponse struct { // CreateSessionRequest is the request body for creating a session type CreateSessionRequest struct { - Task string `json:"task" binding:"required"` - DisplayName string `json:"display_name,omitempty"` - Model string `json:"model,omitempty"` - Repos []Repo `json:"repos,omitempty"` + Task string `json:"task" binding:"required"` + DisplayName string `json:"display_name,omitempty"` + Model string `json:"model,omitempty"` + Repos []Repo `json:"repos,omitempty"` + ActiveWorkflow *WorkflowRef `json:"activeWorkflow,omitempty"` + EnvironmentVariables map[string]string `json:"environmentVariables,omitempty"` +} + +// WorkflowRef identifies a workflow by git repository location. +type WorkflowRef struct { + GitURL string `json:"gitUrl" binding:"required"` + Branch string `json:"branch,omitempty"` + Path string `json:"path,omitempty"` } // Repo represents a repository configuration