diff --git a/src/copaw/app/channels/base.py b/src/copaw/app/channels/base.py index c4e466bc4..f5c9d1495 100644 --- a/src/copaw/app/channels/base.py +++ b/src/copaw/app/channels/base.py @@ -8,6 +8,7 @@ import asyncio import logging +import re from abc import ABC from typing import ( Optional, @@ -698,6 +699,10 @@ async def send_content_parts( ): media_parts.append(p) body = "\n".join(text_parts) if text_parts else "" + # Filter out AI thinking tags like + body = re.sub(r"[\s\S]*?", "", body) + body = re.sub(r"[\s\S]*?", "", body) + body = body.strip() prefix = (meta or {}).get("bot_prefix", "") or "" if prefix and body: body = prefix + body diff --git a/src/copaw/app/channels/napcat/README.md b/src/copaw/app/channels/napcat/README.md new file mode 100644 index 000000000..04806e180 --- /dev/null +++ b/src/copaw/app/channels/napcat/README.md @@ -0,0 +1,298 @@ +# NapCat Channel for CoPaw + +此 Channel 使用 NapCat (OneBot 11 协议) 连接 QQ,实现消息收发功能。 + +## 功能特性 + +- **消息收发**: 支持群聊和私聊消息的接收与发送 +- **WebSocket 实时消息**: 通过 WebSocket 实时接收消息 +- **HTTP API 发送**: 通过 HTTP API 发送消息 +- **消息过滤**: 支持过滤工具消息和思考过程 +- **灵活的权限控制**: 支持白名单、消息前缀过滤、DM/群策略控制 +- **MCP 集成**: 支持配置 MCP 客户端扩展功能 + +## 使用前提 + +1. **安装并运行 NapCat** + - 参考 [NapCat 官方文档](https://napcatnapcat.github.io/NapCatDocs/) 安装 + - 需要配置 WebSocket 连接用于接收消息 + +2. **配置 NapCat 端** + + 确保 NapCat 的 `config/onebot11.json` 中已启用 WebSocket: + + ```json + { + "httpPort": 3000, + "httpHosts": ["0.0.0.0"], + "wsPort": 3001, + "wsHosts": ["0.0.0.0"], + "enableHttp": true, + "enableWs": true, + "accessToken": "" + } + ``` + +3. **配置 config.json** + + 在 `~/.copaw/config.json` 的 `channels` 中添加: + +```json +{ + "napcat": { + "enabled": true, + "host": "127.0.0.1", + "port": 3000, + "ws_port": 3001, + "access_token": "", + "bot_prefix": "", + "filter_tool_messages": false, + "filter_thinking": false, + "dm_policy": "open", + "group_policy": "open", + "allow_from": [], + "deny_message": "" + } +} +``` + +--- + +## 配置项说明 + +| 配置项 | 必填 | 默认值 | 说明 | +|--------|------|--------|------| +| enabled | 是 | false | 是否启用此 Channel | +| host | 是 | 127.0.0.1 | NapCat 服务器地址 | +| port | 是 | 3000 | NapCat HTTP API 端口 | +| ws_port | 是 | 3001 | NapCat WebSocket 端口 | +| access_token | 否 | - | 访问令牌(如果 NapCat 配置了认证) | +| bot_prefix | 否 | - | 消息前缀,只有带此前缀的消息才会被处理 | +| filter_tool_messages | 否 | false | 是否过滤工具消息 | +| filter_thinking | 否 | false | 是否过滤思考过程 | +| dm_policy | 否 | open | 私聊策略:open/deny | +| group_policy | 否 | open | 群消息策略:open/deny | +| allow_from | 否 | [] | 允许的发送者列表(QQ 号) | +| deny_message | 否 | - | 拒绝时的提示消息 | + +--- + +## 快速开始 + +### 1. 安装 NapCat + +参考 [NapCat 官方文档](https://napcatnapcat.github.io/NapCatDocs/) 下载并安装 NapCat。 + +### 2. 配置 NapCat + +编辑 NapCat 的 `config/onebot11.json`,确保启用 HTTP 和 WebSocket: + +```json +{ + "httpPort": 3000, + "httpHosts": ["0.0.0.0"], + "wsPort": 3001, + "wsHosts": ["0.0.0.0"], + "enableHttp": true, + "enableWs": true, + "accessToken": "" +} +``` + +### 3. 启动 NapCat + +```bash +# 运行 NapCat +./NapCat.sh +# 或 Windows +NapCat.exe +``` + +### 4. 启动 CoPaw + +确保 config.json 中 NapCat channel 已启用,然后启动 CoPaw。 + +### 5. 使用方式 + +- **私聊**: 直接发送消息给机器人 +- **群聊**: 在群中 @机器人 或使用配置的前缀 + +--- + +## MCP 配置(可选) + +你可以通过配置 MCP 客户端来扩展 NapCat Channel 的功能。以下是几种常用的 MCP 配置示例: + +### 方式一:通过 CoPaw 控制台添加(推荐) + +1. 打开 CoPaw 控制台,进入 **智能体 → MCP** +2. 点击 **+ 创建** 按钮 +3. 粘贴 MCP 客户端的 JSON 配置 +4. 点击 **创建** 完成导入 + +### 方式二:手动编辑 config.json + +在 `~/.copaw/config.json` 的 `mcp` 节点添加: + +#### 示例 1: NapCat MCP 工具服务器 + +```json +{ + "mcp": { + "clients": { + "napcat_mcp": { + "name": "napcat_mcp", + "description": "NapCat MCP 工具服务", + "enabled": true, + "transport": "stdio", + "command": "npx", + "args": ["-y", "@some/mcp-server"], + "env": {} + } + } + } +} +``` + +#### 示例 2: 带认证的 HTTP MCP 服务 + +```json +{ + "mcp": { + "clients": { + "my_mcp": { + "name": "my_mcp", + "description": "我的 MCP 服务", + "enabled": true, + "transport": "streamable_http", + "url": "http://localhost:8080/mcp", + "headers": { + "Authorization": "Bearer your-token-here" + } + } + } + } +} +``` + +#### 示例 3: 标准 mcpServers 格式 + +```json +{ + "mcp": { + "clients": { + "filesystem": { + "name": "filesystem", + "description": "文件系统访问", + "enabled": true, + "transport": "stdio", + "command": "npx", + "args": [ + "-y", + "@modelcontextprotocol/server-filesystem", + "/Users/username/Documents" + ] + } + } + } +} +``` + +--- + +## 高级配置 + +### 使用消息前缀过滤 + +只有带特定前缀的消息才会被处理: + +```json +{ + "napcat": { + "enabled": true, + "bot_prefix": "/", + "host": "127.0.0.1", + "port": 3000, + "ws_port": 3001 + } +} +``` + +此时,用户需要发送 `/hello` 这样的消息才会被处理。 + +### 白名单用户 + +只允许特定用户使用: + +```json +{ + "napcat": { + "enabled": true, + "host": "127.0.0.1", + "port": 3000, + "ws_port": 3001, + "allow_from": ["123456789", "987654321"], + "deny_message": "抱歉,你没有使用权限" + } +} +``` + +### 过滤工具消息 + +过滤 CoPaw 内部的工具调用消息: + +```json +{ + "napcat": { + "enabled": true, + "host": "127.0.0.1", + "port": 3000, + "ws_port": 3001, + "filter_tool_messages": true, + "filter_thinking": true + } +} +``` + +--- + +## 故障排除 + +### 1. 连接失败 + +确保: +- NapCat 已启动并运行 +- 端口 3000 和 3001 未被占用 +- 防火墙允许这些端口 + +### 2. 消息发送失败 + +检查: +- 机器人是否有发送消息的权限 +- 群号/QQ号是否正确 +- access_token 是否匹配 + +### 3. 110 错误 + +如果收到 "Error 110: 被移出群" 相关的错误: +- 检查 NapCat 日志 +- 确保机器人仍在群中 +- 验证 group_id 配置正确 + +### 4. 查看日志 + +CoPaw 会输出详细日志,关注以下关键词: +- `NapCat connected` - WebSocket 连接成功 +- `NapCat send` - 发送消息 +- `NapCat receive` - 接收消息 +- `NapCat error` - 错误信息 + +--- + +## 注意事项 + +- 确保 NapCat 已经启动并正常运行 +- WebSocket 用于接收消息,HTTP API 用于发送消息 +- 如果 NapCat 配置了 access_token,需要在配置中填写 +- 群聊和私聊使用不同的 session_id 格式进行区分 + diff --git a/src/copaw/app/channels/napcat/__init__.py b/src/copaw/app/channels/napcat/__init__.py new file mode 100644 index 000000000..2c2b70084 --- /dev/null +++ b/src/copaw/app/channels/napcat/__init__.py @@ -0,0 +1,5 @@ +# -*- coding: utf-8 -*- +"""NapCat Channel for CoPaw.""" +from .channel import NapCatChannel + +__all__ = ["NapCatChannel"] diff --git a/src/copaw/app/channels/napcat/api.py b/src/copaw/app/channels/napcat/api.py new file mode 100644 index 000000000..f1c810447 --- /dev/null +++ b/src/copaw/app/channels/napcat/api.py @@ -0,0 +1,145 @@ +# -*- coding: utf-8 -*- +"""NapCat API calls.""" + +from typing import Any, Dict, List, Optional + +import aiohttp + +from .exceptions import NapCatApiError + + +async def api_request( + session: aiohttp.ClientSession, + host: str, + port: int, + access_token: str, + method: str, + path: str, + body: Optional[Dict[str, Any]] = None, +) -> Dict[str, Any]: + """Make HTTP API request to NapCat.""" + url = f"http://{host}:{port}{path}" + headers = {"Content-Type": "application/json"} + if access_token: + headers["Authorization"] = f"Bearer {access_token}" + + kwargs: Dict[str, Any] = {"headers": headers} + if body is not None: + kwargs["json"] = body + + async with session.request(method, url, **kwargs) as resp: + data = await resp.json() + if resp.status >= 400: + raise NapCatApiError(path=path, status=resp.status, data=data) + # Check OneBot retcode and status + retcode = data.get("retcode", 0) + status = data.get("status", "ok") + if retcode != 0 or status == "failed": + msg = data.get("message") or data.get("wording", "Unknown error") + raise NapCatApiError( + path=path, + status=resp.status, + data=data, + message=msg, + ) + return data + + +async def send_group_message( + session: aiohttp.ClientSession, + host: str, + port: int, + access_token: str, + group_id: str, + message: Any, + auto_escape: bool = True, +) -> int: + """Send message to group. + + Returns: + message_id on success + """ + body = { + "group_id": group_id, + "message": message, + "auto_escape": auto_escape, + } + result = await api_request( + session, + host, + port, + access_token, + "POST", + "/send_group_msg", + body, + ) + return (result.get("data") or {}).get("message_id", 0) + + +async def send_private_message( + session: aiohttp.ClientSession, + host: str, + port: int, + access_token: str, + user_id: str, + message: Any, + auto_escape: bool = True, +) -> int: + """Send private message. + + Returns: + message_id on success + """ + body = { + "user_id": user_id, + "message": message, + "auto_escape": auto_escape, + } + result = await api_request( + session, + host, + port, + access_token, + "POST", + "/send_private_msg", + body, + ) + return (result.get("data") or {}).get("message_id", 0) + + +async def get_login_info( + session: aiohttp.ClientSession, + host: str, + port: int, + access_token: str, +) -> Dict[str, Any]: + """Get login info.""" + result = await api_request( + session, + host, + port, + access_token, + "POST", + "/get_login_info", + None, + ) + return result.get("data", {}) + + +async def get_group_list( + session: aiohttp.ClientSession, + host: str, + port: int, + access_token: str, +) -> List[Dict[str, Any]]: + """Get group list.""" + result = await api_request( + session, + host, + port, + access_token, + "POST", + "/get_group_list", + None, + ) + return result.get("data", []) diff --git a/src/copaw/app/channels/napcat/channel.py b/src/copaw/app/channels/napcat/channel.py new file mode 100644 index 000000000..df2e0f842 --- /dev/null +++ b/src/copaw/app/channels/napcat/channel.py @@ -0,0 +1,601 @@ +# -*- coding: utf-8 -*- +"""NapCat Channel for CoPaw. + +NapCat 基于 OneBot 11 协议,使用 WebSocket 接收消息,HTTP API 发送消息。 +""" + +from __future__ import annotations + +import asyncio +import logging +import threading +from pathlib import Path +from typing import Any, Dict, List, Optional + +import aiohttp + +from agentscope_runtime.engine.schemas.agent_schemas import ( + TextContent, + ContentType, +) + +from ..base import ( + BaseChannel, + OnReplySent, + ProcessHandler, +) +from .constants import DEFAULT_MEDIA_DIR +from .api import ( + send_group_message, + send_private_message, + get_login_info, + get_group_list, +) +from .message import ( + parse_message, + build_message_segment, +) +from .websocket import WebSocketClient + +logger = logging.getLogger(__name__) + + +class NapCatChannel(BaseChannel): + """NapCat Channel: + WebSocket events -> Incoming -> process -> HTTP API reply. + + Based on OneBot 11 protocol. + """ + + channel = "napcat" + + def __init__( + self, + process: ProcessHandler, + enabled: bool, + host: str = "127.0.0.1", + port: int = 3000, + ws_port: int = 3001, + access_token: str = "", + bot_prefix: str = "", + on_reply_sent: OnReplySent = None, + show_tool_details: bool = True, + filter_tool_messages: bool = False, + filter_thinking: bool = False, + dm_policy: str = "open", + group_policy: str = "open", + allow_from: List[str] = None, + deny_message: str = "", + media_dir: str = "", + ): + super().__init__( + process, + on_reply_sent=on_reply_sent, + show_tool_details=show_tool_details, + filter_tool_messages=filter_tool_messages, + filter_thinking=filter_thinking, + dm_policy=dm_policy, + group_policy=group_policy, + allow_from=allow_from, + deny_message=deny_message, + ) + self.enabled = enabled + self.host = host + self.port = port + self.ws_port = ws_port + self.access_token = access_token + self.bot_prefix = bot_prefix + self._media_dir = ( + Path(media_dir).expanduser() if media_dir else DEFAULT_MEDIA_DIR + ) + + self._loop: Optional[asyncio.AbstractEventLoop] = None + self._ws_thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + self._account_id = "default" + + self._http: Optional[aiohttp.ClientSession] = None + self._login_info: Optional[Dict[str, Any]] = None + self._group_list: List[Dict[str, Any]] = [] + + @classmethod + def from_config( + cls, + process: ProcessHandler, + config: Any, + on_reply_sent: OnReplySent = None, + show_tool_details: bool = True, + filter_tool_messages: bool = False, + filter_thinking: bool = False, + ) -> "NapCatChannel": + """Create channel from config dict or NapCatConfig model.""" + + def get_config_attr(config_obj: Any, attr: str, default: Any) -> Any: + """Get attribute from dict or Pydantic model.""" + if hasattr(config_obj, "get"): + return config_obj.get(attr, default) + return getattr(config_obj, attr, default) + + # Read config values using helper + enabled = get_config_attr(config, "enabled", False) + host = get_config_attr(config, "host", "127.0.0.1") + port = get_config_attr(config, "port", 3000) + ws_port = get_config_attr(config, "ws_port", 3001) + access_token = get_config_attr(config, "access_token", "") + bot_prefix = get_config_attr(config, "bot_prefix", "") + dm_policy = get_config_attr(config, "dm_policy", "open") + group_policy = get_config_attr(config, "group_policy", "open") + allow_from = get_config_attr(config, "allow_from", []) + deny_message = get_config_attr(config, "deny_message", "") + media_dir = get_config_attr(config, "media_dir", "") + + return cls( + process=process, + enabled=enabled, + host=host, + port=port, + ws_port=ws_port, + access_token=access_token, + bot_prefix=bot_prefix, + on_reply_sent=on_reply_sent, + show_tool_details=show_tool_details, + filter_tool_messages=filter_tool_messages, + filter_thinking=filter_thinking, + dm_policy=dm_policy, + group_policy=group_policy, + allow_from=allow_from, + deny_message=deny_message, + media_dir=media_dir, + ) + + def _is_bot_mentioned( + self, + raw_message: Any, + ) -> bool: + """Check if the bot was mentioned in the message. + + Checks for CQ code: [CQ:at,qq={bot_qq}] + Also supports the 'all' mention: [CQ:at,qq=all] + + Args: + raw_message: The raw message from OneBot 11 event + + Returns: + True if bot is mentioned + """ + if not raw_message: + return False + + # Get bot QQ from login info + bot_qq = ( + str(self._login_info.get("user_id", "")) + if self._login_info + else "" + ) + if not bot_qq: + return False + + # Handle both string and list message formats + if isinstance(raw_message, str): + # Check for CQ at code with bot QQ + if f"[CQ:at,qq={bot_qq}]" in raw_message: + return True + # Also check for @all (everyone) + if "[CQ:at,qq=all]" in raw_message: + return True + elif isinstance(raw_message, list): + # Check message segments for at type + for segment in raw_message: + if isinstance(segment, dict): + seg_type = segment.get("type", "") + if seg_type == "at": + seg_data = segment.get("data", {}) + qq = seg_data.get("qq", "") + # Check if it's mentioning the bot or everyone + if str(qq) == bot_qq or qq == "all": + return True + + return False + + def _clean_at_mention( + self, + raw_message: Any, + ) -> Any: + """Remove the @ bot mention from the message. + + This is used to clean the message before processing, + so the bot doesn't see the @ mention in its input. + + Args: + raw_message: The raw message from OneBot 11 event + + Returns: + Cleaned message without the @ mention + """ + if not raw_message: + return raw_message + + # Get bot QQ from login info + bot_qq = ( + str(self._login_info.get("user_id", "")) + if self._login_info + else "" + ) + + # Handle string message format + if isinstance(raw_message, str): + cleaned = raw_message + # Remove CQ at code with bot QQ + if bot_qq: + cleaned = cleaned.replace(f"[CQ:at,qq={bot_qq}]", "") + # Remove @all CQ code + cleaned = cleaned.replace("[CQ:at,qq=all]", "") + # Also try without comma format + if bot_qq: + cleaned = cleaned.replace(f"[CQ:at qq={bot_qq}]", "") + cleaned = cleaned.replace("[CQ:at qq=all]", "") + return cleaned.strip() + + # Handle list message format + if isinstance(raw_message, list): + cleaned_segments = [] + for segment in raw_message: + if isinstance(segment, dict): + seg_type = segment.get("type", "") + if seg_type == "at": + # Skip at segments + seg_data = segment.get("data", {}) + qq = seg_data.get("qq", "") + # Skip if it's mentioning bot or everyone + if str(qq) == bot_qq or qq == "all": + continue + # Keep other segments + cleaned_segments.append(segment) + else: + cleaned_segments.append(segment) + return cleaned_segments + + return raw_message + + def _should_process( + self, + user_id: str, + group_id: Optional[str] = None, + raw_message: Any = None, + ) -> bool: + """Check if the message should be processed based on policy. + + For group messages, also checks if the bot was mentioned (unless + bot_prefix is set, in which case message must start with prefix). + + Args: + user_id: The user ID who sent the message + group_id: The group ID if this is a group message + raw_message: The raw message content for at-mention checking + """ + # Check allow_from list + if self.allow_from and user_id not in self.allow_from: + return False + + # Check group policy + if group_id: + if self.group_policy == "deny": + return False + + # For group messages, check if bot is mentioned + # (only if no bot_prefix is set, since prefix takes precedence) + if not self.bot_prefix: + if not self._is_bot_mentioned(raw_message): + # Bot not mentioned in group message, skip processing + logger.debug( + f"napcat: group message without bot mention, " + f"group={group_id}, user={user_id}", + ) + return False + else: + # Private message + if self.dm_policy == "deny": + return False + + return True + + async def send( + self, + to_handle: str, + text: str, + meta: Optional[Dict[str, Any]] = None, + ) -> None: + """Send text message via NapCat HTTP API. + + Args: + to_handle: Target ID (group_id or user_id) + text: Message text + meta: Optional metadata including message_type + """ + if not self.enabled or not text.strip(): + return + + text = text.strip() + meta = meta or {} + + # Build message segment (simple text) + message = build_message_segment(text, auto_escape=True) + + # Determine if this is group or private message + # Check multiple sources for group_id + group_id = meta.get("group_id") + session_id = meta.get("session_id", "") + + # If no group_id in meta, try to extract from session_id + if not group_id and session_id.startswith("napcat:group:"): + group_id = session_id.split(":")[-1] + + message_type = meta.get("message_type", "") + is_group = ( + bool(group_id) + or message_type == "group" + or to_handle.startswith("group:") + ) + + if is_group: + # group_id already extracted above from session_id if needed + # Don't overwrite with meta.get("group_id") + if not group_id: + # Fallback: extract from session_id in session_id field + session_id = meta.get("session_id", "") + if session_id.startswith("napcat:group:"): + group_id = session_id.split(":")[-1] + if not group_id: + logger.warning( + "NapCat send: group_id is None, " + f"message_type={message_type}, session_id={session_id}", + ) + try: + await send_group_message( + self._http, + self.host, + self.port, + self.access_token, + group_id, + message, + ) + except Exception: + logger.exception(f"NapCat send to group {group_id} failed") + else: + # Private message + user_id = meta.get("user_id") or to_handle + try: + await send_private_message( + self._http, + self.host, + self.port, + self.access_token, + user_id, + message, + ) + except Exception: + logger.exception("NapCat send to user %s failed", user_id) + + def build_agent_request_from_native(self, native_payload: Any) -> Any: + """Build AgentRequest from NapCat/OneBot 11 event dict.""" + payload = native_payload if isinstance(native_payload, dict) else {} + + # Extract common fields + post_type = payload.get("post_type", "") + message_type = payload.get("message_type", "") + user_id = str(payload.get("user_id", "")) + group_id = ( + str(payload.get("group_id", "")) + if payload.get("group_id") + else None + ) + raw_message = payload.get("message", "") + message_id = payload.get("message_id", 0) + + # Skip if not a message + if post_type != "message": + return None + + # Skip if has bot_prefix and message doesn't start with it + if self.bot_prefix and isinstance(raw_message, str): + if not raw_message.startswith(self.bot_prefix): + # Check if it's an @ mention (CQ code) + bot_qq = ( + str(self._login_info.get("user_id", "")) + if self._login_info + else "" + ) + if bot_qq and f"[CQ:at,qq={bot_qq}]" not in raw_message: + return None + + # Check policy (pass raw_message for at-mention detection) + if not self._should_process(user_id, group_id, raw_message): + return None + + # Clean the message by removing @bot mention (for processing) + cleaned_message = self._clean_at_mention(raw_message) + + # Parse message to content parts + content_parts = parse_message(cleaned_message) + + # Build metadata + meta = { + "message_type": message_type, + "message_id": message_id, + "sender_id": user_id, + "group_id": group_id, + "incoming_raw": payload, + } + + # Build request + return self.build_agent_request_from_user_content( + channel_id=self.channel, + sender_id=user_id, + session_id=self.resolve_session_id(user_id, meta), + content_parts=content_parts, + channel_meta=meta, + ) + + def get_to_handle_from_request(self, request: Any) -> str: + """Resolve send target (to_handle) from AgentRequest. + + For group messages, use group_id; otherwise use user_id. + """ + send_meta = getattr(request, "channel_meta", None) or {} + group_id = send_meta.get("group_id") + message_type = send_meta.get("message_type") + + # Try to get group_id from session_id if not present + if not group_id and message_type == "group": + session_id = getattr(request, "session_id", "") or "" + if session_id.startswith("napcat:group:"): + group_id = session_id.split(":")[-1] + + if group_id: + return group_id + return getattr(request, "user_id", "") or "" + + async def _before_consume_process( + self, + request: Any, + ) -> None: + """Set up send_meta before processing request.""" + # Get or create channel_meta + send_meta = getattr(request, "channel_meta", None) + if send_meta is None: + send_meta = {} + setattr(request, "channel_meta", send_meta) + + # Add bot_prefix to send_meta + send_meta.setdefault("bot_prefix", self.bot_prefix) + + # Add session_id to send_meta so send method can access it + send_meta["session_id"] = getattr(request, "session_id", "") + + async def _on_consume_error( + self, + request: Any, + to_handle: str, + err_text: str, + ) -> None: + """Handle error with bot_prefix prefix.""" + # Add bot_prefix to error message + prefixed_err_text = self.bot_prefix + err_text + + send_meta = getattr(request, "channel_meta", None) or {} + await self.send_content_parts( + to_handle, + [TextContent(type=ContentType.TEXT, text=prefixed_err_text)], + send_meta, + ) + + def _handle_event(self, payload: Dict[str, Any]) -> None: + """Handle incoming NapCat/OneBot 11 event.""" + post_type = payload.get("post_type", "") + + if post_type == "message": + # Build and enqueue request + request = self.build_agent_request_from_native(payload) + if request and self._enqueue is not None: + self._enqueue(request) + logger.info( + f"napcat recv: type={payload.get('message_type')} " + f"user={payload.get('user_id')} " + f"group={payload.get('group_id', 'N/A')}", + ) + elif post_type == "notice": + # Handle notice events (optional) + logger.debug(f"napcat notice: {payload.get('notice_type')}") + elif post_type == "request": + # Handle request events (optional) + logger.debug(f"napcat request: {payload.get('request_type')}") + else: + logger.debug(f"napcat unknown event: {post_type}") + + async def start(self) -> None: + """Start the channel.""" + if not self.enabled: + logger.debug("napcat channel disabled") + return + + if not self.host: + raise RuntimeError("NapCat host is required") + + self._loop = asyncio.get_running_loop() + self._stop_event.clear() + + # Start HTTP session + self._http = aiohttp.ClientSession() + + # Test connection and get login info + try: + self._login_info = await get_login_info( + self._http, + self.host, + self.port, + self.access_token, + ) + logger.info(f"napcat logged in as: {self._login_info}") + + self._group_list = await get_group_list( + self._http, + self.host, + self.port, + self.access_token, + ) + logger.info(f"napcat joined {len(self._group_list)} groups") + except Exception as e: + logger.warning(f"napcat initial fetch failed: {e}") + + # Start WebSocket thread + ws_client = WebSocketClient( + host=self.host, + ws_port=self.ws_port, + access_token=self.access_token, + stop_event=self._stop_event, + message_handler=self._handle_event, + ) + self._ws_thread = threading.Thread( + target=ws_client.run_forever, + daemon=True, + ) + self._ws_thread.start() + logger.info("napcat channel started") + + async def stop(self) -> None: + """Stop the channel.""" + if not self.enabled: + return + + self._stop_event.set() + if self._ws_thread: + self._ws_thread.join(timeout=8) + + if self._http is not None: + await self._http.close() + self._http = None + + logger.info("napcat channel stopped") + + def resolve_session_id( + self, + sender_id: str, + channel_meta: Optional[Dict[str, Any]] = None, + ) -> str: + """Resolve session_id based on message type. + + Group messages: use group_id as session key + Private messages: use user_id as session key + """ + if channel_meta is None: + return f"{self.channel}:{sender_id}" + + message_type = channel_meta.get("message_type", "") + group_id = channel_meta.get("group_id") + + if message_type == "group" and group_id: + # Group message: session per group + return f"{self.channel}:group:{group_id}" + else: + # Private message: session per user + return f"{self.channel}:{sender_id}" diff --git a/src/copaw/app/channels/napcat/constants.py b/src/copaw/app/channels/napcat/constants.py new file mode 100644 index 000000000..4d5ebc71f --- /dev/null +++ b/src/copaw/app/channels/napcat/constants.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +"""NapCat constants and configuration.""" + +from pathlib import Path + +# Reconnect settings +RECONNECT_DELAYS = [1, 2, 5, 10, 30, 60] +MAX_RECONNECT_ATTEMPTS = 100 +QUICK_DISCONNECT_THRESHOLD = 5 +MAX_QUICK_DISCONNECT_COUNT = 3 + +# Default paths +DEFAULT_MEDIA_DIR = Path("~/.copaw/media/napcat").expanduser() + +# Markdown detection regex patterns +MARKDOWN_PATTERNS = [ + r"^#{1,6}\s+.+", # Headers (# to ######) + r"\*\*[^*]+\*\*", # Bold (**text**) + r"(?\s+", # Quote + r"\|.+\|", # Table + r"\[.+\]\(.+\)", # Link + r"!\[.+\]\(.+\)", # Image +] diff --git a/src/copaw/app/channels/napcat/exceptions.py b/src/copaw/app/channels/napcat/exceptions.py new file mode 100644 index 000000000..6f6d2834b --- /dev/null +++ b/src/copaw/app/channels/napcat/exceptions.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- +"""NapCat exceptions.""" + +from typing import Any, Optional + + +class NapCatApiError(RuntimeError): + """HTTP error returned by NapCat API.""" + + def __init__( + self, + path: str, + status: int, + data: Any, + message: Optional[str] = None, + ): + self.path = path + self.status = status + self.data = data + self.message = message + super().__init__(f"NapCat API {path} {status}: {message or data}") diff --git a/src/copaw/app/channels/napcat/message.py b/src/copaw/app/channels/napcat/message.py new file mode 100644 index 000000000..29fcfbde6 --- /dev/null +++ b/src/copaw/app/channels/napcat/message.py @@ -0,0 +1,181 @@ +# -*- coding: utf-8 -*- +"""NapCat message parsing and building.""" + +import re +from typing import Any, Dict, List + +from agentscope_runtime.engine.schemas.agent_schemas import ( + TextContent, + ImageContent, + VideoContent, + AudioContent, + FileContent, + ContentType, +) + +from ..base import OutgoingContentPart +from .constants import MARKDOWN_PATTERNS + + +def parse_message( # pylint: disable=R0912 + message: Any, +) -> List[OutgoingContentPart]: + """Parse OneBot 11 message to content parts. + + Handles: + - Plain text (str) + - Array of message segments + - Message segments: text, image, record, video, file, etc. + """ + parts: List[OutgoingContentPart] = [] + + if not message: + return parts + + # If message is a string (plain text) + if isinstance(message, str): + if message.strip(): + parts.append( + TextContent( + type=ContentType.TEXT, + text=message.strip(), + ), + ) + return parts + + # If message is a list of segments + if isinstance(message, list): + for segment in message: + seg_type = segment.get("type", "") + seg_data = segment.get("data", {}) + + if seg_type == "text": + text = seg_data.get("text", "").strip() + if text: + parts.append(TextContent(type=ContentType.TEXT, text=text)) + + elif seg_type == "image": + # Image can be file path, URL, or base64 + image_data = seg_data.get("file", seg_data.get("url", "")) + if image_data: + parts.append( + ImageContent( + type=ContentType.IMAGE, + image_url=image_data, + ), + ) + + elif seg_type == "record": # Voice + record_data = seg_data.get("file", seg_data.get("url", "")) + if record_data: + parts.append( + AudioContent( + type=ContentType.AUDIO, + data=record_data, + ), + ) + + elif seg_type == "video": + video_data = seg_data.get("file", seg_data.get("url", "")) + if video_data: + parts.append( + VideoContent( + type=ContentType.VIDEO, + video_url=video_data, + ), + ) + + elif seg_type == "file": + file_data = seg_data.get("file", seg_data.get("url", "")) + file_name = seg_data.get("name", "file") + if file_data: + parts.append( + FileContent( + type=ContentType.FILE, + filename=file_name, + file_url=file_data, + ), + ) + + return parts + + +def build_message_segment( # pylint: disable=R0912 + text: str, + auto_escape: bool = True, +) -> Any: + """Build OneBot 11 message segment from text. + + Args: + text: Message text + auto_escape: Whether to escape special characters + + Returns: + Message segment (string or list of segments) + """ + if auto_escape: + # Simple escape for CQ codes + text = text.replace("&", "&") + text = text.replace(",", ",") + text = text.replace("[", "[") + text = text.replace("]", "]") + return text + + +def _contains_markdown(text: str) -> bool: + """Detect if text contains Markdown syntax. + + Args: + text: Text to check + + Returns: + True if text appears to contain Markdown syntax + """ + # Quick check: if no special characters, likely not markdown + if not any(c in text for c in "#*_`~-|[]"): + return False + + # Check each pattern + for pattern in MARKDOWN_PATTERNS: + if re.search(pattern, text, re.MULTILINE): + return True + + return False + + +def build_markdown_message(text: str) -> List[Dict[str, Any]]: + """Build message segment with Markdown if detected. + + Args: + text: Message text + + Returns: + Message segment (string or list of segments) + """ + if _contains_markdown(text): + # Escape special CQ code characters but allow markdown + escaped = text.replace("&", "&") + return [{"type": "markdown", "data": {"content": escaped}}] + else: + return [build_message_segment(text, auto_escape=True)] + + +def is_markdown_error(exc: Exception) -> bool: + """Check if exception is related to markdown sending failure. + + NapCat timeout errors often occur when sending markdown messages. + We fallback to plaintext for these errors. + + Args: + exc: Exception to check + + Returns: + True if this looks like a markdown-related error + """ + error_msg = str(exc).lower() + # Timeout errors are likely markdown-related + if "timeout" in error_msg: + return True + # Check for other markdown-related keywords + markdown_keywords = ["markdown", "json", "ntEvent"] + return any(kw in error_msg for kw in markdown_keywords) diff --git a/src/copaw/app/channels/napcat/websocket.py b/src/copaw/app/channels/napcat/websocket.py new file mode 100644 index 000000000..e1844a941 --- /dev/null +++ b/src/copaw/app/channels/napcat/websocket.py @@ -0,0 +1,135 @@ +# -*- coding: utf-8 -*- +"""NapCat WebSocket client.""" + +import json +import logging +import threading +import time +from typing import Any, Callable, Dict + +import websocket + +from .constants import ( + RECONNECT_DELAYS, + MAX_RECONNECT_ATTEMPTS, + QUICK_DISCONNECT_THRESHOLD, + MAX_QUICK_DISCONNECT_COUNT, +) + +logger = logging.getLogger(__name__) + + +class WebSocketClient: + """WebSocket client for NapCat.""" + + def __init__( + self, + host: str, + ws_port: int, + access_token: str, + stop_event: threading.Event, + message_handler: Callable[[Dict[str, Any]], None], + ): + self.host = host + self.ws_port = ws_port + self.access_token = access_token + self._stop_event = stop_event + self._message_handler = message_handler + + def run_forever(self) -> None: + """Run WebSocket client to receive events.""" + reconnect_attempts = 0 + last_connect_time = 0.0 + quick_disconnect_count = 0 + + def connect() -> bool: + # pylint: disable=invalid-name + nonlocal reconnect_attempts + nonlocal last_connect_time, quick_disconnect_count + if self._stop_event.is_set(): + return False + + ws_url = f"ws://{self.host}:{self.ws_port}/ws" + headers = {} + if self.access_token: + headers["Authorization"] = f"Bearer {self.access_token}" + + logger.info(f"napcat connecting to {ws_url}") + + try: + ws = websocket.create_connection( + ws_url, + header=headers, + timeout=30, + ) + except Exception as e: + logger.warning(f"napcat ws connect failed: {e}") + return True + + current_ws = ws + + try: + while not self._stop_event.is_set(): + try: + raw = current_ws.recv() + except websocket.WebSocketTimeoutException: + continue + except websocket.WebSocketConnectionClosedException: + break + + if not raw: + break + + try: + payload = json.loads(raw) + except json.JSONDecodeError: + logger.warning(f"napcat invalid JSON: {raw[:200]}") + continue + + # Handle OneBot 11 event + self._message_handler(payload) + + except Exception as e: + logger.exception(f"napcat ws loop: {e}") + finally: + try: + current_ws.close() + except Exception: + pass + + # Calculate reconnect delay + if ( + last_connect_time + and (time.time() - last_connect_time) + < QUICK_DISCONNECT_THRESHOLD + ): + quick_disconnect_count += 1 + if quick_disconnect_count >= MAX_QUICK_DISCONNECT_COUNT: + quick_disconnect_count = 0 + delay = 60 # Rate limit + else: + delay = RECONNECT_DELAYS[ + min(reconnect_attempts, len(RECONNECT_DELAYS) - 1) + ] + else: + quick_disconnect_count = 0 + delay = RECONNECT_DELAYS[ + min(reconnect_attempts, len(RECONNECT_DELAYS) - 1) + ] + + reconnect_attempts += 1 + if reconnect_attempts >= MAX_RECONNECT_ATTEMPTS: + logger.error("napcat max reconnect attempts reached") + return False + + logger.info( + f"napcat reconnecting in {delay}s " + f"(attempt {reconnect_attempts})", + ) + self._stop_event.wait(timeout=delay) + return not self._stop_event.is_set() + + while connect(): + pass + self._stop_event.set() + logger.info("napcat ws thread stopped") diff --git a/src/copaw/app/channels/registry.py b/src/copaw/app/channels/registry.py index 8cbd9d1b6..4277c30f6 100644 --- a/src/copaw/app/channels/registry.py +++ b/src/copaw/app/channels/registry.py @@ -22,6 +22,7 @@ "dingtalk": (".dingtalk", "DingTalkChannel"), "feishu": (".feishu", "FeishuChannel"), "qq": (".qq", "QQChannel"), + "napcat": (".napcat", "NapCatChannel"), "telegram": (".telegram", "TelegramChannel"), "mattermost": (".mattermost", "MattermostChannel"), "mqtt": (".mqtt", "MQTTChannel"), diff --git a/src/copaw/config/config.py b/src/copaw/config/config.py index 2e8757464..6631c14ca 100644 --- a/src/copaw/config/config.py +++ b/src/copaw/config/config.py @@ -63,6 +63,16 @@ class QQConfig(BaseChannelConfig): markdown_enabled: bool = True +class NapCatConfig(BaseChannelConfig): + """NapCat (OneBot 11) channel: self-hosted QQ bot.""" + + host: str = "127.0.0.1" + port: int = 3000 + ws_port: int = 3001 + access_token: str = "" + media_dir: str = "~/.copaw/media" + + class TelegramConfig(BaseChannelConfig): bot_token: str = "" http_proxy: str = "" @@ -134,6 +144,7 @@ class ChannelConfig(BaseModel): dingtalk: DingTalkConfig = DingTalkConfig() feishu: FeishuConfig = FeishuConfig() qq: QQConfig = QQConfig() + napcat: NapCatConfig = NapCatConfig() telegram: TelegramConfig = TelegramConfig() mattermost: MattermostConfig = MattermostConfig() mqtt: MQTTConfig = MQTTConfig()