diff --git a/src/bot/utils/draft_streamer.py b/src/bot/utils/draft_streamer.py index 4fe1e709..6f263ba6 100644 --- a/src/bot/utils/draft_streamer.py +++ b/src/bot/utils/draft_streamer.py @@ -1,4 +1,9 @@ -"""Stream partial responses to Telegram via sendMessageDraft.""" +"""Stream partial responses to Telegram via sendMessageDraft. + +Uses Telegram Bot API 9.3+ sendMessageDraft for smooth token-by-token +streaming in private chats. Falls back to editMessageText for group chats +where sendMessageDraft is unavailable. +""" import secrets import time @@ -14,6 +19,17 @@ # Max tool lines shown in the draft header _MAX_TOOL_LINES = 10 +# Minimum characters before sending the first draft (avoids triggering +# push notifications with just a few characters) +_MIN_INITIAL_CHARS = 20 + +# Error messages that indicate the draft transport is unavailable +_DRAFT_UNAVAILABLE_ERRORS = frozenset({ + "TEXTDRAFT_PEER_INVALID", + "Bad Request: draft can't be sent", + "Bad Request: peer doesn't support drafts", +}) + def generate_draft_id() -> int: """Generate a non-zero positive draft ID. @@ -30,18 +46,21 @@ class DraftStreamer: The draft is composed of two sections: 1. **Tool header** — compact lines showing tool calls and reasoning - snippets as they arrive, e.g. ``"📖 Read | 🔍 Grep | 🐚 Bash"``. + snippets as they arrive. 2. **Response body** — the actual assistant response text, streamed token-by-token. Both sections are combined into a single draft message and sent via - ``sendMessageDraft``. + ``sendMessageDraft`` (private chats) or ``editMessageText`` (groups). - Key design decisions: + Key design decisions (inspired by OpenClaw): - Plain text drafts (no parse_mode) to avoid partial HTML/markdown errors. - - Tail-truncation for messages >4096 chars: shows ``"\\u2026" + last 4093 chars``. - - Self-disabling: any API error silently disables the streamer so the - request continues with normal (non-streaming) delivery. + - Tail-truncation for messages >4096 chars. + - Min initial chars: waits for ~20 chars before first send. + - Anti-regressive: skips updates where text got shorter. + - Error classification: distinguishes draft-unavailable (fall back to edit) + from other errors (disable entirely). + - Self-disabling: persistent errors silently disable the streamer. """ def __init__( @@ -50,7 +69,8 @@ def __init__( chat_id: int, draft_id: int, message_thread_id: Optional[int] = None, - throttle_interval: float = 0.3, + throttle_interval: float = 0.4, + is_private_chat: bool = True, ) -> None: self.bot = bot self.chat_id = chat_id @@ -61,7 +81,18 @@ def __init__( self._tool_lines: List[str] = [] self._accumulated_text = "" self._last_send_time = 0.0 + self._last_sent_length = 0 # anti-regressive tracking self._enabled = True + self._error_count = 0 + self._max_errors = 3 + + # Transport mode: "draft" for private chats, "edit" for groups + self._use_draft = is_private_chat + self._edit_message_id: Optional[int] = None # for edit-based transport + + @property + def enabled(self) -> bool: + return self._enabled async def append_tool(self, line: str) -> None: """Append a tool activity line and send a draft if throttled.""" @@ -87,10 +118,14 @@ async def flush(self) -> None: return if not self._accumulated_text and not self._tool_lines: return - await self._send_draft() + await self._send_draft(force=True) + + def _compose_draft(self, is_final: bool = False) -> str: + """Combine tool header and response body into a single draft. - def _compose_draft(self) -> str: - """Combine tool header and response body into a single draft.""" + Appends a blinking cursor ▌ during streaming (like OpenClaw) + to indicate the response is still being generated. + """ parts: List[str] = [] if self._tool_lines: @@ -103,33 +138,157 @@ def _compose_draft(self) -> str: if self._accumulated_text: if parts: parts.append("") # blank separator line - parts.append(self._accumulated_text) + text = self._accumulated_text + if not is_final: + text += " ▌" + parts.append(text) return "\n".join(parts) - async def _send_draft(self) -> None: - """Send the composed draft (tools + text) as a message draft.""" + async def _send_draft(self, force: bool = False) -> None: + """Send the composed draft via the appropriate transport.""" draft_text = self._compose_draft() if not draft_text.strip(): return + # Min initial chars gate (skip if force-flushing) + if not force and self._last_sent_length == 0: + if len(self._accumulated_text) < _MIN_INITIAL_CHARS and not self._tool_lines: + return + + # Anti-regressive: skip if text got shorter (can happen with + # tool header rotation) + current_len = len(draft_text) + if not force and current_len < self._last_sent_length: + return + # Tail-truncate if over Telegram limit if len(draft_text) > TELEGRAM_MAX_MESSAGE_LENGTH: - draft_text = "\u2026" + draft_text[-(TELEGRAM_MAX_MESSAGE_LENGTH - 1) :] + draft_text = "\u2026" + draft_text[-(TELEGRAM_MAX_MESSAGE_LENGTH - 1):] try: + if self._use_draft: + await self._send_via_draft(draft_text) + else: + await self._send_via_edit(draft_text) + self._last_send_time = time.time() + self._last_sent_length = current_len + self._error_count = 0 # reset on success + except telegram.error.BadRequest as e: + error_str = str(e) + if any(err in error_str for err in _DRAFT_UNAVAILABLE_ERRORS): + # Draft transport unavailable — fall back to edit + logger.info( + "Draft transport unavailable, falling back to edit", + chat_id=self.chat_id, + error=error_str, + ) + self._use_draft = False + # Retry immediately with edit transport + try: + await self._send_via_edit(draft_text) + self._last_send_time = time.time() + self._last_sent_length = current_len + except Exception: + self._handle_error() + elif "Message is not modified" in error_str: + # Same content — not an error, just skip + self._last_send_time = time.time() + elif "Message to edit not found" in error_str: + # Message was deleted — re-create + self._edit_message_id = None + try: + await self._send_via_edit(draft_text) + self._last_send_time = time.time() + self._last_sent_length = current_len + except Exception: + self._handle_error() + else: + self._handle_error() + except Exception: + self._handle_error() + + def _handle_error(self) -> None: + """Track errors and disable after too many.""" + self._error_count += 1 + if self._error_count >= self._max_errors: + logger.debug( + "Draft streamer disabled after repeated errors", + chat_id=self.chat_id, + error_count=self._error_count, + ) + self._enabled = False + + async def _send_via_draft(self, text: str) -> None: + """Send via sendMessageDraft (private chats).""" + kwargs = { + "chat_id": self.chat_id, + "text": text, + "draft_id": self.draft_id, + } + if self.message_thread_id is not None: + kwargs["message_thread_id"] = self.message_thread_id + logger.debug( + "Sending draft", + transport="draft", + text_len=len(text), + preview=text[:80], + ) + await self.bot.send_message_draft(**kwargs) + + async def _send_via_edit(self, text: str) -> None: + """Send via editMessageText (group chat fallback). + + Creates a message on first call, then edits it on subsequent calls. + """ + if self._edit_message_id is None: + # Send initial message kwargs = { "chat_id": self.chat_id, - "text": draft_text, - "draft_id": self.draft_id, + "text": text, } if self.message_thread_id is not None: kwargs["message_thread_id"] = self.message_thread_id - await self.bot.send_message_draft(**kwargs) - self._last_send_time = time.time() - except Exception: - logger.debug( - "Draft send failed, disabling streamer", + msg = await self.bot.send_message(**kwargs) + self._edit_message_id = msg.message_id + else: + await self.bot.edit_message_text( + text, chat_id=self.chat_id, + message_id=self._edit_message_id, ) - self._enabled = False + + async def clear(self) -> None: + """Clear the draft bubble by sending an empty draft. + + Call this before sending the final response message so the draft + bubble disappears cleanly instead of overlapping with the real message. + """ + if not self._enabled: + return + try: + if self._use_draft: + # Send empty draft to dismiss the typing bubble + await self.bot.send_message_draft( + chat_id=self.chat_id, + text="", + draft_id=self.draft_id, + ) + elif self._edit_message_id is not None: + # For edit-based transport, delete the preview message + try: + await self.bot.delete_message( + chat_id=self.chat_id, + message_id=self._edit_message_id, + ) + except Exception: + pass + self._edit_message_id = None + except Exception: + pass + self._enabled = False + + @property + def edit_message_id(self) -> Optional[int]: + """Return the message ID used by edit transport (for cleanup).""" + return self._edit_message_id diff --git a/src/config/settings.py b/src/config/settings.py index 77c34ea4..9ba77b69 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -210,22 +210,23 @@ class Settings(BaseSettings): ), ) - # Output verbosity (0=quiet, 1=normal, 2=detailed) + # Output verbosity (0=quiet, 1=normal, 2=detailed, 3=full) verbose_level: int = Field( 1, description=( "Bot output verbosity: 0=quiet (final response only), " "1=normal (tool names + reasoning), " - "2=detailed (tool inputs + longer reasoning)" + "2=detailed (tool inputs + longer reasoning), " + "3=full (tool results + complete commands)" ), ge=0, - le=2, + le=3, ) - # Streaming drafts (Telegram sendMessageDraft) + # Streaming drafts (Telegram sendMessageDraft / editMessageText) enable_stream_drafts: bool = Field( False, - description="Stream partial responses via sendMessageDraft (private chats only)", + description="Stream partial responses to Telegram in real-time", ) stream_draft_interval: float = Field( 0.3,