diff --git a/src/fastmcp/cli/client.py b/src/fastmcp/cli/client.py index 9a14e4f06..cc43b3aff 100644 --- a/src/fastmcp/cli/client.py +++ b/src/fastmcp/cli/client.py @@ -10,6 +10,7 @@ import cyclopts import mcp.types from rich.console import Console +from rich.markup import escape as escape_rich_markup from fastmcp.cli.discovery import DiscoveredServer, discover_servers, resolve_name from fastmcp.client.client import CallToolResult, Client @@ -405,15 +406,30 @@ def _print_schema(label: str, schema: dict[str, Any]) -> None: console.print(f" [dim]{label}: {json.dumps(schema)}[/dim]") +def _sanitize_untrusted_text(value: str) -> str: + """Escape rich markup and encode control chars for terminal-safe output.""" + sanitized = escape_rich_markup(value) + return "".join( + ch + if ch in {"\n", "\t"} or (0x20 <= ord(ch) < 0x7F) or ord(ch) > 0x9F + else f"\\x{ord(ch):02x}" + for ch in sanitized + ) + + def _format_call_result_text(result: CallToolResult) -> None: """Pretty-print a tool call result to the console.""" if result.is_error: for block in result.content: if isinstance(block, mcp.types.TextContent): - console.print(f"[bold red]Error:[/bold red] {block.text}") + console.print( + f"[bold red]Error:[/bold red] {_sanitize_untrusted_text(block.text)}" + ) else: - console.print(f"[bold red]Error:[/bold red] {block}") + console.print( + f"[bold red]Error:[/bold red] {_sanitize_untrusted_text(str(block))}" + ) return if result.structured_content is not None: @@ -422,7 +438,7 @@ def _format_call_result_text(result: CallToolResult) -> None: for block in result.content: if isinstance(block, mcp.types.TextContent): - console.print(block.text) + console.print(_sanitize_untrusted_text(block.text)) elif isinstance(block, mcp.types.ImageContent): size = len(block.data) * 3 // 4 # rough decoded size console.print(f"[dim][Image: {block.mimeType}, ~{size} bytes][/dim]") @@ -430,7 +446,7 @@ def _format_call_result_text(result: CallToolResult) -> None: size = len(block.data) * 3 // 4 console.print(f"[dim][Audio: {block.mimeType}, ~{size} bytes][/dim]") else: - console.print(str(block)) + console.print(_sanitize_untrusted_text(str(block))) def _content_block_to_dict(block: mcp.types.ContentBlock) -> dict[str, Any]: @@ -554,7 +570,7 @@ async def _handle_resource( for block in contents: if isinstance(block, mcp.types.TextResourceContents): - console.print(block.text) + console.print(_sanitize_untrusted_text(block.text)) elif isinstance(block, mcp.types.BlobResourceContents): size = len(block.blob) * 3 // 4 console.print(f"[dim][Blob: {block.mimeType}, ~{size} bytes][/dim]") @@ -604,16 +620,16 @@ async def _handle_prompt( return for msg in result.messages: - console.print(f"[bold]{msg.role}:[/bold]") + console.print(f"[bold]{_sanitize_untrusted_text(msg.role)}:[/bold]") if isinstance(msg.content, mcp.types.TextContent): - console.print(f" {msg.content.text}") + console.print(f" {_sanitize_untrusted_text(msg.content.text)}") elif isinstance(msg.content, mcp.types.ImageContent): size = len(msg.content.data) * 3 // 4 console.print( f" [dim][Image: {msg.content.mimeType}, ~{size} bytes][/dim]" ) else: - console.print(f" {msg.content}") + console.print(f" {_sanitize_untrusted_text(str(msg.content))}") console.print() @@ -727,9 +743,11 @@ async def list_command( console.print() for tool in tools: sig = format_tool_signature(tool) - console.print(f" [cyan]{sig}[/cyan]") + console.print(f" [cyan]{_sanitize_untrusted_text(sig)}[/cyan]") if tool.description: - console.print(f" {tool.description}") + console.print( + f" {_sanitize_untrusted_text(tool.description)}" + ) if input_schema: _print_schema("Input", tool.inputSchema) if output_schema and tool.outputSchema: @@ -743,11 +761,13 @@ async def list_command( if not res: console.print(" [dim]No resources found.[/dim]") for r in res: - console.print(f" [cyan]{r.uri}[/cyan]") + console.print( + f" [cyan]{_sanitize_untrusted_text(str(r.uri))}[/cyan]" + ) desc_parts = [r.name or "", r.description or ""] desc = " — ".join(p for p in desc_parts if p) if desc: - console.print(f" {desc}") + console.print(f" {_sanitize_untrusted_text(desc)}") console.print() if prompts: @@ -761,9 +781,11 @@ async def list_command( if p.arguments: parts = [a.name for a in p.arguments] args_str = f"({', '.join(parts)})" - console.print(f" [cyan]{p.name}{args_str}[/cyan]") + console.print( + f" [cyan]{_sanitize_untrusted_text(p.name + args_str)}[/cyan]" + ) if p.description: - console.print(f" {p.description}") + console.print(f" {_sanitize_untrusted_text(p.description)}") console.print() except Exception as exc: diff --git a/tests/cli/test_client_commands.py b/tests/cli/test_client_commands.py index 1add45ba2..a0d0745d7 100644 --- a/tests/cli/test_client_commands.py +++ b/tests/cli/test_client_commands.py @@ -16,6 +16,7 @@ _build_stdio_from_command, _format_call_result_text, _is_http_target, + _sanitize_untrusted_text, call_command, coerce_value, format_tool_signature, @@ -555,3 +556,27 @@ def test_structured_content_uses_dict_not_data( _format_call_result_text(result) captured = capsys.readouterr() assert "value" in captured.out + + def test_escapes_rich_markup_and_control_chars( + self, capsys: pytest.CaptureFixture[str] + ): + result = CallToolResult( + content=[mcp.types.TextContent(type="text", text="[red]x[/red]\x1b[2J")], + structured_content=None, + meta=None, + data=None, + is_error=False, + ) + + _format_call_result_text(result) + captured = capsys.readouterr() + assert "[red]x[/red]" in captured.out + assert "\\x1b" in captured.out + assert "\x1b" not in captured.out + + +class TestSanitizeUntrustedText: + def test_sanitize_untrusted_text(self): + value = "[bold]hello[/bold]\x07" + sanitized = _sanitize_untrusted_text(value) + assert sanitized == "\\[bold]hello\\[/bold]\\x07"