Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion console/src/pages/Chat/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ export default function ChatPage() {
console.warn("Failed to get selected agent from storage:", error);
}

return fetch(defaultConfig?.api?.baseURL || getApiUrl("/agent/process"), {
return fetch(defaultConfig?.api?.baseURL || getApiUrl("/console/chat"), {
method: "POST",
headers,
body: JSON.stringify(requestBody),
Expand Down
39 changes: 33 additions & 6 deletions src/copaw/app/channels/console/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@
A lightweight channel that prints all agent responses to stdout.

Messages are sent to the agent via the standard AgentApp ``/agent/process``
endpoint. This channel only handles the **output** side: whenever a
completed message event or a proactive send arrives, it is pretty-printed
to the terminal.
endpoint or via POST /console/chat. This channel handles the **output** side:
whenever a completed message event or a proactive send arrives, it is
pretty-printed to the terminal.
"""
from __future__ import annotations

import logging
import os
import sys
import json
from datetime import datetime
from typing import Any, Dict, List, Optional
from typing import Any, AsyncGenerator, Dict, List, Optional

from agentscope_runtime.engine.schemas.agent_schemas import RunStatus

Expand Down Expand Up @@ -139,6 +140,19 @@ def from_config(
filter_thinking=filter_thinking,
)

def resolve_session_id(
self,
sender_id: str,
channel_meta: Optional[dict] = None,
) -> str:
"""Resolve session_id: use explicit meta['session_id'] when provided
(e.g. from the HTTP /console/chat API), otherwise fall back to
'console:<sender_id>'.
"""
if channel_meta and channel_meta.get("session_id"):
return channel_meta["session_id"]
return f"{self.channel}:{sender_id}"

def build_agent_request_from_native(self, native_payload: Any) -> Any:
"""
Build AgentRequest from console native payload (dict with
Expand All @@ -161,8 +175,8 @@ def build_agent_request_from_native(self, native_payload: Any) -> Any:
request.channel_meta = meta
return request

async def consume_one(self, payload: Any) -> None:
"""Process one payload (AgentRequest or native dict) from queue."""
async def stream_one(self, payload: Any) -> AsyncGenerator[str, None]:
"""Process one payload and yield SSE-formatted events"""
if isinstance(payload, dict) and "content_parts" in payload:
session_id = self.resolve_session_id(
payload.get("sender_id") or "",
Expand Down Expand Up @@ -212,6 +226,14 @@ async def consume_one(self, payload: Any) -> None:
ev_type,
)

if hasattr(event, "model_dump_json"):
data = event.model_dump_json()
elif hasattr(event, "json"):
data = event.json()
else:
data = json.dumps({"text": str(event)})
yield f"data: {data}\n\n"

if obj == "message" and status == RunStatus.Completed:
parts = self._message_to_content_parts(event)
self._print_parts(parts, ev_type)
Expand Down Expand Up @@ -242,6 +264,11 @@ async def consume_one(self, payload: Any) -> None:
err_msg = str(e).strip() or "An error occurred while processing."
self._print_error(err_msg)

async def consume_one(self, payload: Any) -> None:
"""Process one payload; drain stream_one (queue/terminal)."""
async for _ in self.stream_one(payload):
pass

# ── pretty-print helpers ────────────────────────────────────────

def _print_parts(
Expand Down
2 changes: 2 additions & 0 deletions src/copaw/app/routers/agent_scoped.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def create_agent_scoped_router() -> APIRouter:
from .workspace import router as workspace_router
from ..crons.api import router as cron_router
from ..runner.api import router as chats_router
from .console import router as console_router

# Create parent router with agentId parameter
router = APIRouter(prefix="/agents/{agentId}", tags=["agent-scoped"])
Expand All @@ -85,5 +86,6 @@ def create_agent_scoped_router() -> APIRouter:
router.include_router(skills_router)
router.include_router(tools_router)
router.include_router(workspace_router)
router.include_router(console_router)

return router
98 changes: 96 additions & 2 deletions src/copaw/app/routers/console.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,106 @@
# -*- coding: utf-8 -*-
"""Console APIs for push messages."""
"""Console APIs: push messages and chat."""
from __future__ import annotations

from fastapi import APIRouter, Query
import json
import logging
from typing import AsyncGenerator, Union

from fastapi import APIRouter, HTTPException, Query, Request
from starlette.responses import StreamingResponse

from agentscope_runtime.engine.schemas.agent_schemas import AgentRequest

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/console", tags=["console"])


@router.post(
"/chat",
status_code=200,
summary="Chat with console (streaming response)",
description="Agent API Request Format. "
"See https://runtime.agentscope.io/en/protocol.html for "
"more details.",
)
async def post_console_chat(
request_data: Union[AgentRequest, dict],
request: Request,
) -> StreamingResponse:
"""Accept a user message and stream the agent response.

Accepts AgentRequest or dict, builds native payload, and streams events
via channel.stream_one().
"""

from ..agent_context import get_agent_for_request

workspace = await get_agent_for_request(request)

# Extract channel info from request
if isinstance(request_data, AgentRequest):
channel_id = request_data.channel or "console"
sender_id = request_data.user_id or "default"
session_id = request_data.session_id or "default"
content_parts = (
list(request_data.input[0].content) if request_data.input else []
)
else:
# Dict format - extract from request body
channel_id = request_data.get("channel", "console")
sender_id = request_data.get("user_id", "default")
session_id = request_data.get("session_id", "default")
input_data = request_data.get("input", [])

# Extract content from input array
content_parts = []
if input_data and len(input_data) > 0:
last_msg = input_data[-1]
if hasattr(last_msg, "content"):
content_parts = list(last_msg.content or [])
elif isinstance(last_msg, dict) and "content" in last_msg:
content_parts = last_msg["content"] or []

#
console_channel = await workspace.channel_manager.get_channel("console")
if console_channel is None:
raise HTTPException(
status_code=503,
detail="Channel Console not found",
)

# Build native payload
native_payload = {
"channel_id": channel_id,
"sender_id": sender_id,
"content_parts": content_parts,
"meta": {
"session_id": session_id,
"user_id": sender_id,
},
}

async def event_generator() -> AsyncGenerator[str, None]:
try:
async for event_data in console_channel.stream_one(native_payload):
yield event_data
except Exception as e:
logger.exception("Console chat stream error")
yield f"data: {json.dumps({'error': str(e)})}\n\n"
finally:
yield "data: [DONE]\n\n"

return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)


@router.get("/push-messages")
async def get_push_messages(
session_id: str | None = Query(None, description="Optional session id"),
Expand Down
Loading