diff --git a/README.md b/README.md index 3eaf46a..76d532d 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,7 @@ This MCP server exposes a huge suite of Telegram tools. **Every major Telegram/T - **export_chat_invite(chat_id)**: Export invite link - **import_chat_invite(hash)**: Join chat by invite hash - **join_chat_by_link(link)**: Join chat by invite link +- **subscribe_public_channel(channel)**: Subscribe to a public channel or supergroup by username or ID ### Messaging - **get_messages(chat_id, page, page_size)**: Paginated messages @@ -74,6 +75,8 @@ This MCP server exposes a huge suite of Telegram tools. **Every major Telegram/T - **get_pinned_messages(chat_id)**: List pinned messages - **get_last_interaction(contact_id)**: Most recent message with a contact - **create_poll(chat_id, question, options, multiple_choice, quiz_mode, public_votes, close_date)**: Create a poll +- **list_inline_buttons(chat_id, message_id, limit)**: Inspect inline keyboards to discover button text/index +- **press_inline_button(chat_id, message_id, button_text, button_index)**: Trigger inline keyboard callbacks by label or index ### Contact Management - **list_contacts()**: List all contacts @@ -313,6 +316,76 @@ Example output: Message sent successfully. ``` +### Listing Inline Buttons + +```python +@mcp.tool() +async def list_inline_buttons( + chat_id: Union[int, str], + message_id: Optional[int] = None, + limit: int = 20, +) -> str: + """ + Discover inline keyboard layout, including button indices, callback availability, and URLs. + """ +``` + +Example usage: +``` +list_inline_buttons(chat_id="@sample_tasks_bot") +``` + +This returns something like: +``` +Buttons for message 42 (date 2025-01-01 12:00:00+00:00): +[0] text='📋 View tasks', callback=yes +[1] text='ℹ️ Help', callback=yes +[2] text='🌐 Visit site', callback=no, url=https://example.org +``` + +### Pressing Inline Buttons + +```python +@mcp.tool() +async def press_inline_button( + chat_id: Union[int, str], + message_id: Optional[int] = None, + button_text: Optional[str] = None, + button_index: Optional[int] = None, +) -> str: + """ + Press an inline keyboard button by label or zero-based index. + If message_id is omitted, the server searches recent messages for the latest inline keyboard. + """ +``` + +Example usage: +``` +press_inline_button(chat_id="@sample_tasks_bot", button_text="📋 View tasks") +``` + +Use `list_inline_buttons` first if you need to inspect available buttons—pass a bogus `button_text` +to quickly list options or call `list_inline_buttons` directly. Once you know the text or index, +`press_inline_button` sends the callback, just like tapping the button in a native Telegram client. + +### Subscribing to Public Channels + +```python +@mcp.tool() +async def subscribe_public_channel(channel: Union[int, str]) -> str: + """ + Join a public channel or supergroup by username (e.g., "@examplechannel") or ID. + """ +``` + +Example usage: +``` +subscribe_public_channel(channel="@daily_updates_feed") +``` + +If the account is already a participant, the tool reports that instead of failing, making it safe to +run repeatedly in workflows that need idempotent joins. + ### Getting Chat Invite Links The `get_invite_link` function is particularly robust with multiple fallback methods: diff --git a/main.py b/main.py index 5de5278..ad5d8f8 100644 --- a/main.py +++ b/main.py @@ -391,6 +391,205 @@ async def send_message(chat_id: Union[int, str], message: str) -> str: return log_and_format_error("send_message", e, chat_id=chat_id) +@mcp.tool() +@validate_id("channel") +async def subscribe_public_channel(channel: Union[int, str]) -> str: + """ + Subscribe (join) to a public channel or supergroup by username or ID. + """ + try: + entity = await client.get_entity(channel) + await client(functions.channels.JoinChannelRequest(channel=entity)) + title = getattr(entity, "title", getattr(entity, "username", "Unknown channel")) + return f"Subscribed to {title}." + except telethon.errors.rpcerrorlist.UserAlreadyParticipantError: + title = getattr(entity, "title", getattr(entity, "username", "this channel")) + return f"Already subscribed to {title}." + except telethon.errors.rpcerrorlist.ChannelPrivateError: + return "Cannot subscribe: this channel is private or requires an invite link." + except Exception as e: + return log_and_format_error("subscribe_public_channel", e, channel=channel) + + +@mcp.tool() +@validate_id("chat_id") +async def list_inline_buttons( + chat_id: Union[int, str], message_id: Optional[Union[int, str]] = None, limit: int = 20 +) -> str: + """ + Inspect inline buttons on a recent message to discover their indices/text/URLs. + """ + try: + if isinstance(message_id, str): + if message_id.isdigit(): + message_id = int(message_id) + else: + return "message_id must be an integer." + + entity = await client.get_entity(chat_id) + target_message = None + + if message_id is not None: + target_message = await client.get_messages(entity, ids=message_id) + if isinstance(target_message, list): + target_message = target_message[0] if target_message else None + else: + recent_messages = await client.get_messages(entity, limit=limit) + target_message = next( + (msg for msg in recent_messages if getattr(msg, "buttons", None)), None + ) + + if not target_message: + return "No message with inline buttons found." + + buttons_attr = getattr(target_message, "buttons", None) + if not buttons_attr: + return f"Message {target_message.id} does not contain inline buttons." + + buttons = [btn for row in buttons_attr for btn in row] + if not buttons: + return f"Message {target_message.id} does not contain inline buttons." + + lines = [ + f"Buttons for message {target_message.id} (date {target_message.date}):", + ] + for idx, btn in enumerate(buttons): + raw_button = getattr(btn, "button", None) + text = getattr(btn, "text", "") or "" + url = getattr(raw_button, "url", None) if raw_button else None + has_callback = bool(getattr(btn, "data", None)) + parts = [f"[{idx}] text='{text}'"] + parts.append("callback=yes" if has_callback else "callback=no") + if url: + parts.append(f"url={url}") + lines.append(", ".join(parts)) + + return "\n".join(lines) + except Exception as e: + return log_and_format_error( + "list_inline_buttons", + e, + chat_id=chat_id, + message_id=message_id, + limit=limit, + ) + + +@mcp.tool() +@validate_id("chat_id") +async def press_inline_button( + chat_id: Union[int, str], + message_id: Optional[Union[int, str]] = None, + button_text: Optional[str] = None, + button_index: Optional[int] = None, +) -> str: + """ + Press an inline button (callback) in a chat message. + + Args: + chat_id: Chat or bot where the inline keyboard exists. + message_id: Specific message ID to inspect. If omitted, searches recent messages for one containing buttons. + button_text: Exact text of the button to press (case-insensitive). + button_index: Zero-based index among all buttons if you prefer positional access. + """ + try: + if button_text is None and button_index is None: + return "Provide button_text or button_index to choose a button." + + # Normalize message_id if provided as a string + if isinstance(message_id, str): + if message_id.isdigit(): + message_id = int(message_id) + else: + return "message_id must be an integer." + + if isinstance(button_index, str): + if button_index.isdigit(): + button_index = int(button_index) + else: + return "button_index must be an integer." + + entity = await client.get_entity(chat_id) + + target_message = None + if message_id is not None: + target_message = await client.get_messages(entity, ids=message_id) + if isinstance(target_message, list): + target_message = target_message[0] if target_message else None + else: + recent_messages = await client.get_messages(entity, limit=20) + target_message = next( + (msg for msg in recent_messages if getattr(msg, "buttons", None)), None + ) + + if not target_message: + return "No message with inline buttons found. Specify message_id to target a specific message." + + buttons_attr = getattr(target_message, "buttons", None) + if not buttons_attr: + return f"Message {target_message.id} does not contain inline buttons." + + buttons = [btn for row in buttons_attr for btn in row] + if not buttons: + return f"Message {target_message.id} does not contain inline buttons." + + target_button = None + if button_text: + normalized = button_text.strip().lower() + target_button = next( + ( + btn + for btn in buttons + if (getattr(btn, "text", "") or "").strip().lower() == normalized + ), + None, + ) + + if target_button is None and button_index is not None: + if button_index < 0 or button_index >= len(buttons): + return f"button_index out of range. Valid indices: 0-{len(buttons) - 1}." + target_button = buttons[button_index] + + if not target_button: + available = ", ".join( + f"[{idx}] {getattr(btn, 'text', '') or ''}" + for idx, btn in enumerate(buttons) + ) + return f"Button not found. Available buttons: {available}" + + if not getattr(target_button, "data", None): + raw_button = getattr(target_button, "button", None) + url = getattr(raw_button, "url", None) if raw_button else None + if url: + return f"Selected button opens a URL instead of sending a callback: {url}" + return "Selected button does not provide callback data to press." + + callback_result = await client( + functions.messages.GetBotCallbackAnswerRequest( + peer=entity, msg_id=target_message.id, data=target_button.data + ) + ) + + response_parts = [] + if getattr(callback_result, "message", None): + response_parts.append(callback_result.message) + if getattr(callback_result, "alert", None): + response_parts.append("Telegram displayed an alert to the user.") + if not response_parts: + response_parts.append("Button pressed successfully.") + + return " ".join(response_parts) + except Exception as e: + return log_and_format_error( + "press_inline_button", + e, + chat_id=chat_id, + message_id=message_id, + button_text=button_text, + button_index=button_index, + ) + + @mcp.tool() async def list_contacts() -> str: """