Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 36 additions & 14 deletions src/fastmcp/cli/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import cyclopts
import mcp.types
from rich.console import Console
from rich.markup import escape as escape_rich_markup

from fastmcp.cli.discovery import DiscoveredServer, discover_servers, resolve_name
from fastmcp.client.client import CallToolResult, Client
Expand Down Expand Up @@ -405,15 +406,30 @@ def _print_schema(label: str, schema: dict[str, Any]) -> None:
console.print(f" [dim]{label}: {json.dumps(schema)}[/dim]")


def _sanitize_untrusted_text(value: str) -> str:
"""Escape rich markup and encode control chars for terminal-safe output."""
sanitized = escape_rich_markup(value)
return "".join(
ch
if ch in {"\n", "\t"} or (0x20 <= ord(ch) < 0x7F) or ord(ch) > 0x9F
else f"\\x{ord(ch):02x}"
for ch in sanitized
)


def _format_call_result_text(result: CallToolResult) -> None:
"""Pretty-print a tool call result to the console."""

if result.is_error:
for block in result.content:
if isinstance(block, mcp.types.TextContent):
console.print(f"[bold red]Error:[/bold red] {block.text}")
console.print(
f"[bold red]Error:[/bold red] {_sanitize_untrusted_text(block.text)}"
)
else:
console.print(f"[bold red]Error:[/bold red] {block}")
console.print(
f"[bold red]Error:[/bold red] {_sanitize_untrusted_text(str(block))}"
)
return

if result.structured_content is not None:
Expand All @@ -422,15 +438,15 @@ def _format_call_result_text(result: CallToolResult) -> None:

for block in result.content:
if isinstance(block, mcp.types.TextContent):
console.print(block.text)
console.print(_sanitize_untrusted_text(block.text))
elif isinstance(block, mcp.types.ImageContent):
size = len(block.data) * 3 // 4 # rough decoded size
console.print(f"[dim][Image: {block.mimeType}, ~{size} bytes][/dim]")
elif isinstance(block, mcp.types.AudioContent):
size = len(block.data) * 3 // 4
console.print(f"[dim][Audio: {block.mimeType}, ~{size} bytes][/dim]")
else:
console.print(str(block))
console.print(_sanitize_untrusted_text(str(block)))


def _content_block_to_dict(block: mcp.types.ContentBlock) -> dict[str, Any]:
Expand Down Expand Up @@ -554,7 +570,7 @@ async def _handle_resource(

for block in contents:
if isinstance(block, mcp.types.TextResourceContents):
console.print(block.text)
console.print(_sanitize_untrusted_text(block.text))
elif isinstance(block, mcp.types.BlobResourceContents):
size = len(block.blob) * 3 // 4
console.print(f"[dim][Blob: {block.mimeType}, ~{size} bytes][/dim]")
Expand Down Expand Up @@ -604,16 +620,16 @@ async def _handle_prompt(
return

for msg in result.messages:
console.print(f"[bold]{msg.role}:[/bold]")
console.print(f"[bold]{_sanitize_untrusted_text(msg.role)}:[/bold]")
if isinstance(msg.content, mcp.types.TextContent):
console.print(f" {msg.content.text}")
console.print(f" {_sanitize_untrusted_text(msg.content.text)}")
elif isinstance(msg.content, mcp.types.ImageContent):
size = len(msg.content.data) * 3 // 4
console.print(
f" [dim][Image: {msg.content.mimeType}, ~{size} bytes][/dim]"
)
else:
console.print(f" {msg.content}")
console.print(f" {_sanitize_untrusted_text(str(msg.content))}")
console.print()


Expand Down Expand Up @@ -727,9 +743,11 @@ async def list_command(
console.print()
for tool in tools:
sig = format_tool_signature(tool)
console.print(f" [cyan]{sig}[/cyan]")
console.print(f" [cyan]{_sanitize_untrusted_text(sig)}[/cyan]")
if tool.description:
console.print(f" {tool.description}")
console.print(
f" {_sanitize_untrusted_text(tool.description)}"
)
if input_schema:
_print_schema("Input", tool.inputSchema)
if output_schema and tool.outputSchema:
Expand All @@ -743,11 +761,13 @@ async def list_command(
if not res:
console.print(" [dim]No resources found.[/dim]")
for r in res:
console.print(f" [cyan]{r.uri}[/cyan]")
console.print(
f" [cyan]{_sanitize_untrusted_text(str(r.uri))}[/cyan]"
)
desc_parts = [r.name or "", r.description or ""]
desc = " — ".join(p for p in desc_parts if p)
if desc:
console.print(f" {desc}")
console.print(f" {_sanitize_untrusted_text(desc)}")
console.print()

if prompts:
Expand All @@ -761,9 +781,11 @@ async def list_command(
if p.arguments:
parts = [a.name for a in p.arguments]
args_str = f"({', '.join(parts)})"
console.print(f" [cyan]{p.name}{args_str}[/cyan]")
console.print(
f" [cyan]{_sanitize_untrusted_text(p.name + args_str)}[/cyan]"
)
if p.description:
console.print(f" {p.description}")
console.print(f" {_sanitize_untrusted_text(p.description)}")
console.print()

except Exception as exc:
Expand Down
25 changes: 25 additions & 0 deletions tests/cli/test_client_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
_build_stdio_from_command,
_format_call_result_text,
_is_http_target,
_sanitize_untrusted_text,
call_command,
coerce_value,
format_tool_signature,
Expand Down Expand Up @@ -555,3 +556,27 @@ def test_structured_content_uses_dict_not_data(
_format_call_result_text(result)
captured = capsys.readouterr()
assert "value" in captured.out

def test_escapes_rich_markup_and_control_chars(
self, capsys: pytest.CaptureFixture[str]
):
result = CallToolResult(
content=[mcp.types.TextContent(type="text", text="[red]x[/red]\x1b[2J")],
structured_content=None,
meta=None,
data=None,
is_error=False,
)

_format_call_result_text(result)
captured = capsys.readouterr()
assert "[red]x[/red]" in captured.out
assert "\\x1b" in captured.out
assert "\x1b" not in captured.out


class TestSanitizeUntrustedText:
def test_sanitize_untrusted_text(self):
value = "[bold]hello[/bold]\x07"
sanitized = _sanitize_untrusted_text(value)
assert sanitized == "\\[bold]hello\\[/bold]\\x07"