diff --git a/console/src/pages/Chat/index.tsx b/console/src/pages/Chat/index.tsx index d5ccf2b19..204a4614f 100644 --- a/console/src/pages/Chat/index.tsx +++ b/console/src/pages/Chat/index.tsx @@ -341,7 +341,7 @@ export default function ChatPage() { console.warn("Failed to get selected agent from storage:", error); } - return fetch(defaultConfig?.api?.baseURL || getApiUrl("/agent/process"), { + return fetch(defaultConfig?.api?.baseURL || getApiUrl("/console/chat"), { method: "POST", headers, body: JSON.stringify(requestBody), diff --git a/src/copaw/app/channels/console/channel.py b/src/copaw/app/channels/console/channel.py index 26d480c42..4498c1fef 100644 --- a/src/copaw/app/channels/console/channel.py +++ b/src/copaw/app/channels/console/channel.py @@ -5,17 +5,18 @@ A lightweight channel that prints all agent responses to stdout. Messages are sent to the agent via the standard AgentApp ``/agent/process`` -endpoint. This channel only handles the **output** side: whenever a -completed message event or a proactive send arrives, it is pretty-printed -to the terminal. +endpoint or via POST /console/chat. This channel handles the **output** side: +whenever a completed message event or a proactive send arrives, it is +pretty-printed to the terminal. """ from __future__ import annotations import logging import os import sys +import json from datetime import datetime -from typing import Any, Dict, List, Optional +from typing import Any, AsyncGenerator, Dict, List, Optional from agentscope_runtime.engine.schemas.agent_schemas import RunStatus @@ -139,6 +140,19 @@ def from_config( filter_thinking=filter_thinking, ) + def resolve_session_id( + self, + sender_id: str, + channel_meta: Optional[dict] = None, + ) -> str: + """Resolve session_id: use explicit meta['session_id'] when provided + (e.g. from the HTTP /console/chat API), otherwise fall back to + 'console:'. + """ + if channel_meta and channel_meta.get("session_id"): + return channel_meta["session_id"] + return f"{self.channel}:{sender_id}" + def build_agent_request_from_native(self, native_payload: Any) -> Any: """ Build AgentRequest from console native payload (dict with @@ -161,8 +175,8 @@ def build_agent_request_from_native(self, native_payload: Any) -> Any: request.channel_meta = meta return request - async def consume_one(self, payload: Any) -> None: - """Process one payload (AgentRequest or native dict) from queue.""" + async def stream_one(self, payload: Any) -> AsyncGenerator[str, None]: + """Process one payload and yield SSE-formatted events""" if isinstance(payload, dict) and "content_parts" in payload: session_id = self.resolve_session_id( payload.get("sender_id") or "", @@ -212,6 +226,14 @@ async def consume_one(self, payload: Any) -> None: ev_type, ) + if hasattr(event, "model_dump_json"): + data = event.model_dump_json() + elif hasattr(event, "json"): + data = event.json() + else: + data = json.dumps({"text": str(event)}) + yield f"data: {data}\n\n" + if obj == "message" and status == RunStatus.Completed: parts = self._message_to_content_parts(event) self._print_parts(parts, ev_type) @@ -242,6 +264,11 @@ async def consume_one(self, payload: Any) -> None: err_msg = str(e).strip() or "An error occurred while processing." self._print_error(err_msg) + async def consume_one(self, payload: Any) -> None: + """Process one payload; drain stream_one (queue/terminal).""" + async for _ in self.stream_one(payload): + pass + # ── pretty-print helpers ──────────────────────────────────────── def _print_parts( diff --git a/src/copaw/app/routers/agent_scoped.py b/src/copaw/app/routers/agent_scoped.py index 4b127775d..f0437adb8 100644 --- a/src/copaw/app/routers/agent_scoped.py +++ b/src/copaw/app/routers/agent_scoped.py @@ -64,6 +64,7 @@ def create_agent_scoped_router() -> APIRouter: from .workspace import router as workspace_router from ..crons.api import router as cron_router from ..runner.api import router as chats_router + from .console import router as console_router # Create parent router with agentId parameter router = APIRouter(prefix="/agents/{agentId}", tags=["agent-scoped"]) @@ -85,5 +86,6 @@ def create_agent_scoped_router() -> APIRouter: router.include_router(skills_router) router.include_router(tools_router) router.include_router(workspace_router) + router.include_router(console_router) return router diff --git a/src/copaw/app/routers/console.py b/src/copaw/app/routers/console.py index c44883f97..0213a52c7 100644 --- a/src/copaw/app/routers/console.py +++ b/src/copaw/app/routers/console.py @@ -1,12 +1,106 @@ # -*- coding: utf-8 -*- -"""Console APIs for push messages.""" +"""Console APIs: push messages and chat.""" +from __future__ import annotations -from fastapi import APIRouter, Query +import json +import logging +from typing import AsyncGenerator, Union +from fastapi import APIRouter, HTTPException, Query, Request +from starlette.responses import StreamingResponse + +from agentscope_runtime.engine.schemas.agent_schemas import AgentRequest + +logger = logging.getLogger(__name__) router = APIRouter(prefix="/console", tags=["console"]) +@router.post( + "/chat", + status_code=200, + summary="Chat with console (streaming response)", + description="Agent API Request Format. " + "See https://runtime.agentscope.io/en/protocol.html for " + "more details.", +) +async def post_console_chat( + request_data: Union[AgentRequest, dict], + request: Request, +) -> StreamingResponse: + """Accept a user message and stream the agent response. + + Accepts AgentRequest or dict, builds native payload, and streams events + via channel.stream_one(). + """ + + from ..agent_context import get_agent_for_request + + workspace = await get_agent_for_request(request) + + # Extract channel info from request + if isinstance(request_data, AgentRequest): + channel_id = request_data.channel or "console" + sender_id = request_data.user_id or "default" + session_id = request_data.session_id or "default" + content_parts = ( + list(request_data.input[0].content) if request_data.input else [] + ) + else: + # Dict format - extract from request body + channel_id = request_data.get("channel", "console") + sender_id = request_data.get("user_id", "default") + session_id = request_data.get("session_id", "default") + input_data = request_data.get("input", []) + + # Extract content from input array + content_parts = [] + if input_data and len(input_data) > 0: + last_msg = input_data[-1] + if hasattr(last_msg, "content"): + content_parts = list(last_msg.content or []) + elif isinstance(last_msg, dict) and "content" in last_msg: + content_parts = last_msg["content"] or [] + + # + console_channel = await workspace.channel_manager.get_channel("console") + if console_channel is None: + raise HTTPException( + status_code=503, + detail="Channel Console not found", + ) + + # Build native payload + native_payload = { + "channel_id": channel_id, + "sender_id": sender_id, + "content_parts": content_parts, + "meta": { + "session_id": session_id, + "user_id": sender_id, + }, + } + + async def event_generator() -> AsyncGenerator[str, None]: + try: + async for event_data in console_channel.stream_one(native_payload): + yield event_data + except Exception as e: + logger.exception("Console chat stream error") + yield f"data: {json.dumps({'error': str(e)})}\n\n" + finally: + yield "data: [DONE]\n\n" + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + }, + ) + + @router.get("/push-messages") async def get_push_messages( session_id: str | None = Query(None, description="Optional session id"),