From 9545827ac58164b0353b6710708c4f87a22dddc2 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 24 Dec 2025 11:16:31 +0000 Subject: [PATCH 1/5] feat(anthropic): add extended thinking support for Claude 4+ models Add comprehensive support for Anthropic's Extended Thinking feature: - Add ThinkingBlock, ThinkingDelta, and related type imports - Add thinking_enabled and thinking_budget_tokens config options - Update model database with anthropic_thinking reasoning mode for Claude 4+ - Handle ThinkingDelta streaming with is_reasoning=True - Store thinking segments in REASONING channel for UI display - Store raw thinking blocks for tool use passback (API verification) - Add interleaved-thinking-2025-05-14 beta header for tool use - Update multipart converter to preserve thinking blocks in assistant messages - Extract thinking_tokens in usage tracking - Handle structured output mutual exclusivity (thinking disabled) Extended thinking enables Claude to show step-by-step reasoning while maintaining compatibility with tool use through interleaved thinking. --- src/fast_agent/config.py | 14 ++ src/fast_agent/constants.py | 2 + src/fast_agent/llm/model_database.py | 23 +- .../llm/provider/anthropic/llm_anthropic.py | 227 +++++++++++++++--- .../multipart_converter_anthropic.py | 27 ++- src/fast_agent/llm/usage_tracking.py | 5 + 6 files changed, 253 insertions(+), 45 deletions(-) diff --git a/src/fast_agent/config.py b/src/fast_agent/config.py index 4ba426f2..58fad7b2 100644 --- a/src/fast_agent/config.py +++ b/src/fast_agent/config.py @@ -296,6 +296,20 @@ class AnthropicSettings(BaseModel): - "auto": Currently same as "prompt" - caches tools+system prompt (1 block) and template content. """ + thinking_enabled: bool = False + """ + Enable extended thinking for supported Claude models (Sonnet 4+, Opus 4+). + When enabled, Claude will show its step-by-step reasoning process. + Note: Extended thinking is incompatible with structured output (forced tool choice). + """ + + thinking_budget_tokens: int = 10000 + """ + Maximum tokens for Claude's internal reasoning process (minimum 1024). + Larger budgets enable more thorough analysis for complex problems. + Must be less than max_tokens. + """ + model_config = ConfigDict(extra="allow", arbitrary_types_allowed=True) diff --git a/src/fast_agent/constants.py b/src/fast_agent/constants.py index 3b77e0ea..73f1a431 100644 --- a/src/fast_agent/constants.py +++ b/src/fast_agent/constants.py @@ -6,6 +6,8 @@ HUMAN_INPUT_TOOL_NAME = "__human_input" MCP_UI = "mcp-ui" REASONING = "reasoning" +ANTHROPIC_THINKING_BLOCKS = "anthropic-thinking-raw" +"""Raw Anthropic thinking blocks with signatures for tool use passback.""" FAST_AGENT_ERROR_CHANNEL = "fast-agent-error" FAST_AGENT_REMOVED_METADATA_CHANNEL = "fast-agent-removed-meta" FAST_AGENT_TIMING = "fast-agent-timing" diff --git a/src/fast_agent/llm/model_database.py b/src/fast_agent/llm/model_database.py index 0ae3c9d2..a65e696f 100644 --- a/src/fast_agent/llm/model_database.py +++ b/src/fast_agent/llm/model_database.py @@ -158,10 +158,23 @@ class ModelDatabase: ) ANTHROPIC_OPUS_4_VERSIONED = ModelParameters( - context_window=200000, max_output_tokens=32000, tokenizes=ANTHROPIC_MULTIMODAL + context_window=200000, + max_output_tokens=32000, + tokenizes=ANTHROPIC_MULTIMODAL, + reasoning="anthropic_thinking", ) ANTHROPIC_SONNET_4_VERSIONED = ModelParameters( - context_window=200000, max_output_tokens=64000, tokenizes=ANTHROPIC_MULTIMODAL + context_window=200000, + max_output_tokens=64000, + tokenizes=ANTHROPIC_MULTIMODAL, + reasoning="anthropic_thinking", + ) + # Claude 3.7 Sonnet supports extended thinking (deprecated but still available) + ANTHROPIC_37_SERIES_THINKING = ModelParameters( + context_window=200000, + max_output_tokens=16384, + tokenizes=ANTHROPIC_MULTIMODAL, + reasoning="anthropic_thinking", ) DEEPSEEK_CHAT_STANDARD = ModelParameters( @@ -315,9 +328,9 @@ class ModelDatabase: "claude-3-5-sonnet-20240620": ANTHROPIC_35_SERIES, "claude-3-5-sonnet-20241022": ANTHROPIC_35_SERIES, "claude-3-5-sonnet-latest": ANTHROPIC_35_SERIES, - "claude-3-7-sonnet": ANTHROPIC_37_SERIES, - "claude-3-7-sonnet-20250219": ANTHROPIC_37_SERIES, - "claude-3-7-sonnet-latest": ANTHROPIC_37_SERIES, + "claude-3-7-sonnet": ANTHROPIC_37_SERIES_THINKING, + "claude-3-7-sonnet-20250219": ANTHROPIC_37_SERIES_THINKING, + "claude-3-7-sonnet-latest": ANTHROPIC_37_SERIES_THINKING, "claude-sonnet-4-0": ANTHROPIC_SONNET_4_VERSIONED, "claude-sonnet-4-20250514": ANTHROPIC_SONNET_4_VERSIONED, "claude-sonnet-4-5": ANTHROPIC_SONNET_4_VERSIONED, diff --git a/src/fast_agent/llm/provider/anthropic/llm_anthropic.py b/src/fast_agent/llm/provider/anthropic/llm_anthropic.py index fc02191d..147967f9 100644 --- a/src/fast_agent/llm/provider/anthropic/llm_anthropic.py +++ b/src/fast_agent/llm/provider/anthropic/llm_anthropic.py @@ -15,9 +15,14 @@ RawContentBlockStartEvent, RawContentBlockStopEvent, RawMessageDeltaEvent, + RedactedThinkingBlock, + SignatureDelta, TextBlock, TextBlockParam, TextDelta, + ThinkingBlock, + ThinkingBlockParam, + ThinkingDelta, ToolParam, ToolUseBlock, ToolUseBlockParam, @@ -32,7 +37,7 @@ TextContent, ) -from fast_agent.constants import FAST_AGENT_ERROR_CHANNEL +from fast_agent.constants import ANTHROPIC_THINKING_BLOCKS, FAST_AGENT_ERROR_CHANNEL, REASONING from fast_agent.core.exceptions import ProviderKeyError from fast_agent.core.logging.logger import get_logger from fast_agent.core.prompt import Prompt @@ -137,6 +142,35 @@ def _get_cache_mode(self) -> str: cache_mode = self.context.config.anthropic.cache_mode return cache_mode + def _is_thinking_enabled(self, model: str) -> bool: + """Check if extended thinking should be enabled for this request. + + Extended thinking is enabled when: + 1. The model supports it (has reasoning="anthropic_thinking" in database) + 2. thinking_enabled is True in config + """ + from fast_agent.llm.model_database import ModelDatabase + + # Check if model supports extended thinking + reasoning_mode = ModelDatabase.get_reasoning(model) + if reasoning_mode != "anthropic_thinking": + return False + + # Check config setting + if self.context.config and self.context.config.anthropic: + return self.context.config.anthropic.thinking_enabled + return False + + def _get_thinking_budget(self) -> int: + """Get the thinking budget tokens (minimum 1024). + + Returns the configured budget or default of 10000. + """ + if self.context.config and self.context.config.anthropic: + budget = getattr(self.context.config.anthropic, "thinking_budget_tokens", 10000) + return max(1024, budget) # Enforce minimum + return 10000 # Default + async def _prepare_tools( self, structured_model: Type[ModelT] | None = None, tools: list[Tool] | None = None ) -> list[ToolParam]: @@ -280,11 +314,18 @@ async def _process_stream( stream: AsyncMessageStream, model: str, capture_filename: Path | None = None, - ) -> Message: - """Process the streaming response and display real-time token usage.""" + ) -> tuple[Message, list[str]]: + """Process the streaming response and display real-time token usage. + + Returns: + Tuple of (Message, list of thinking text segments) + """ # Track estimated output tokens by counting text chunks estimated_tokens = 0 tool_streams: dict[int, dict[str, Any]] = {} + # Track thinking content for extended thinking + thinking_segments: list[str] = [] + thinking_indices: set[int] = set() # Track which indices are thinking blocks try: # Process the raw event stream to get token counts @@ -295,6 +336,25 @@ async def _process_stream( if isinstance(event, RawContentBlockStartEvent): content_block = event.content_block + + # Handle ThinkingBlock start (extended thinking) + if isinstance(content_block, ThinkingBlock): + thinking_indices.add(event.index) + self.logger.debug( + "Started thinking block", + data={"index": event.index}, + ) + continue + + # Handle RedactedThinkingBlock (encrypted thinking) + if isinstance(content_block, RedactedThinkingBlock): + thinking_indices.add(event.index) + self.logger.debug( + "Received redacted thinking block", + data={"index": event.index}, + ) + continue + if isinstance(content_block, ToolUseBlock): tool_streams[event.index] = { "name": content_block.name, @@ -325,6 +385,28 @@ async def _process_stream( if isinstance(event, RawContentBlockDeltaEvent): delta = event.delta + + # Handle ThinkingDelta - stream reasoning content + if isinstance(delta, ThinkingDelta): + thinking_text = delta.thinking + if thinking_text: + # Emit as reasoning chunk for UI streaming + self._notify_stream_listeners( + StreamChunk(text=thinking_text, is_reasoning=True) + ) + thinking_segments.append(thinking_text) + continue + + # Handle SignatureDelta (end of thinking block verification) + if isinstance(delta, SignatureDelta): + # Signature marks the end of a thinking block + # The signature is used for verification when passing blocks back + self.logger.debug( + "Received thinking signature", + data={"index": event.index}, + ) + continue + if isinstance(delta, InputJSONDelta): info = tool_streams.get(event.index) if info is not None: @@ -351,42 +433,52 @@ async def _process_stream( ) continue - if isinstance(event, RawContentBlockStopEvent) and event.index in tool_streams: - info = tool_streams.pop(event.index) - preview_raw = "".join(info.get("buffer", [])) - if preview_raw: - preview = ( - preview_raw if len(preview_raw) <= 120 else preview_raw[:117] + "..." - ) + if isinstance(event, RawContentBlockStopEvent): + # Handle thinking block stop + if event.index in thinking_indices: + thinking_indices.discard(event.index) self.logger.debug( - "Completed tool input stream", + "Completed thinking block", + data={"index": event.index}, + ) + continue + + if event.index in tool_streams: + info = tool_streams.pop(event.index) + preview_raw = "".join(info.get("buffer", [])) + if preview_raw: + preview = ( + preview_raw if len(preview_raw) <= 120 else preview_raw[:117] + "..." + ) + self.logger.debug( + "Completed tool input stream", + data={ + "tool_name": info.get("name"), + "tool_use_id": info.get("id"), + "input_preview": preview, + }, + ) + self._notify_tool_stream_listeners( + "stop", + { + "tool_name": info.get("name"), + "tool_use_id": info.get("id"), + "index": event.index, + "streams_arguments": False, + }, + ) + self.logger.info( + "Model finished streaming tool input", data={ + "progress_action": ProgressAction.CALLING_TOOL, + "agent_name": self.name, + "model": model, "tool_name": info.get("name"), "tool_use_id": info.get("id"), - "input_preview": preview, + "tool_event": "stop", }, ) - self._notify_tool_stream_listeners( - "stop", - { - "tool_name": info.get("name"), - "tool_use_id": info.get("id"), - "index": event.index, - "streams_arguments": False, - }, - ) - self.logger.info( - "Model finished streaming tool input", - data={ - "progress_action": ProgressAction.CALLING_TOOL, - "agent_name": self.name, - "model": model, - "tool_name": info.get("name"), - "tool_use_id": info.get("id"), - "tool_event": "stop", - }, - ) - continue + continue # Count tokens in real-time from content_block_delta events if isinstance(event, RawContentBlockDeltaEvent): @@ -432,7 +524,7 @@ async def _process_stream( f"Streaming complete - Model: {model}, Input tokens: {message.usage.input_tokens}, Output tokens: {message.usage.output_tokens}" ) - return message + return message, thinking_segments except APIError as error: logger.error("Streaming APIError during Anthropic completion", exc_info=error) raise # Re-raise to be handled by _anthropic_completion @@ -564,7 +656,7 @@ async def _anthropic_completion( model = self.default_request_params.model or DEFAULT_ANTHROPIC_MODEL # Create base arguments dictionary - base_args = { + base_args: dict[str, Any] = { "model": model, "messages": messages, "stop_sequences": params.stopSequences, @@ -574,12 +666,43 @@ async def _anthropic_completion( if self.instruction or params.systemPrompt: base_args["system"] = self.instruction or params.systemPrompt + # Check if extended thinking should be enabled + # Note: Extended thinking is incompatible with forced tool choice (structured output) + thinking_enabled = self._is_thinking_enabled(model) + if structured_model: + if thinking_enabled: + # Cannot use extended thinking with structured output + logger.warning( + "Extended thinking is incompatible with structured output. " + "Disabling thinking for this request." + ) + thinking_enabled = False base_args["tool_choice"] = {"type": "tool", "name": STRUCTURED_OUTPUT_TOOL_NAME} - if params.maxTokens is not None: + # Add thinking configuration if enabled + if thinking_enabled: + thinking_budget = self._get_thinking_budget() + base_args["thinking"] = { + "type": "enabled", + "budget_tokens": thinking_budget, + } + # Ensure max_tokens is set and greater than budget + # The budget must be less than max_tokens + current_max = params.maxTokens or 16000 + if current_max <= thinking_budget: + # Increase max_tokens to accommodate thinking budget + response + base_args["max_tokens"] = thinking_budget + 8192 + else: + base_args["max_tokens"] = current_max + elif params.maxTokens is not None: base_args["max_tokens"] = params.maxTokens + # Add interleaved thinking beta header when using tools with thinking + # This enables thinking between tool calls for more sophisticated reasoning + if thinking_enabled and available_tools: + base_args["extra_headers"] = {"anthropic-beta": "interleaved-thinking-2025-05-14"} + self._log_chat_progress(self.chat_turn(), model=model) # Use the base class method to prepare all arguments with Anthropic-specific exclusions # Do this BEFORE applying cache control so metadata doesn't override cached fields @@ -614,10 +737,13 @@ async def _anthropic_completion( capture_filename = _stream_capture_filename(self.chat_turn()) # Use streaming API with helper + thinking_segments: list[str] = [] try: async with anthropic.messages.stream(**arguments) as stream: # Process the stream - response = await self._process_stream(stream, model, capture_filename) + response, thinking_segments = await self._process_stream( + stream, model, capture_filename + ) except asyncio.CancelledError as e: reason = str(e) if e.args else "cancelled" logger.info(f"Anthropic completion cancelled: {reason}") @@ -695,8 +821,31 @@ async def _anthropic_completion( self._log_chat_finished(model=model) - return Prompt.assistant( - *response_content_blocks, stop_reason=stop_reason, tool_calls=tool_calls + # Build channels dict with thinking content if available + channels: dict[str, list[ContentBlock] | list[Any]] | None = None + if thinking_segments: + # Store thinking segments in the REASONING channel for display + channels = {REASONING: [TextContent(type="text", text="".join(thinking_segments))]} + + # Extract raw thinking blocks from response for tool use passback + # These need to be preserved with their signatures for API verification + raw_thinking_blocks: list[Any] = [] + for content_block in response.content: + if isinstance(content_block, (ThinkingBlock, RedactedThinkingBlock)): + raw_thinking_blocks.append(content_block) + + if raw_thinking_blocks: + if channels is None: + channels = {} + # Store raw blocks for passback in tool use scenarios + channels[ANTHROPIC_THINKING_BLOCKS] = raw_thinking_blocks + + return PromptMessageExtended( + role="assistant", + content=response_content_blocks, + tool_calls=tool_calls, + channels=channels, + stop_reason=stop_reason, ) async def _apply_prompt_provider_specific( diff --git a/src/fast_agent/llm/provider/anthropic/multipart_converter_anthropic.py b/src/fast_agent/llm/provider/anthropic/multipart_converter_anthropic.py index 9f532578..23bfa450 100644 --- a/src/fast_agent/llm/provider/anthropic/multipart_converter_anthropic.py +++ b/src/fast_agent/llm/provider/anthropic/multipart_converter_anthropic.py @@ -10,7 +10,10 @@ ImageBlockParam, MessageParam, PlainTextSourceParam, + RedactedThinkingBlock, TextBlockParam, + ThinkingBlock, + ThinkingBlockParam, ToolResultBlockParam, ToolUseBlockParam, URLImageSourceParam, @@ -27,6 +30,7 @@ TextResourceContents, ) +from fast_agent.constants import ANTHROPIC_THINKING_BLOCKS from fast_agent.core.logging.logger import get_logger from fast_agent.mcp.helpers.content_helpers import ( get_image_data, @@ -76,12 +80,33 @@ def convert_to_anthropic(multipart_msg: PromptMessageExtended) -> MessageParam: An Anthropic API MessageParam object """ role = multipart_msg.role - all_content_blocks = [] + all_content_blocks: list = [] # If this is an assistant message that contains tool_calls, convert # those into Anthropic tool_use blocks so the next user message can # legally include corresponding tool_result blocks. if role == "assistant" and multipart_msg.tool_calls: + # CRITICAL: Thinking blocks must come FIRST in assistant messages + # when using extended thinking with tool use + if multipart_msg.channels: + raw_thinking = multipart_msg.channels.get(ANTHROPIC_THINKING_BLOCKS) + if raw_thinking: + for thinking_block in raw_thinking: + # Pass through raw ThinkingBlock/RedactedThinkingBlock + # These contain signatures needed for API verification + if isinstance(thinking_block, ThinkingBlock): + all_content_blocks.append( + ThinkingBlockParam( + type="thinking", + thinking=thinking_block.thinking, + signature=thinking_block.signature, + ) + ) + elif isinstance(thinking_block, RedactedThinkingBlock): + # Redacted thinking blocks are passed as-is + # They contain encrypted data that the API can verify + all_content_blocks.append(thinking_block) + for tool_use_id, req in multipart_msg.tool_calls.items(): sanitized_id = AnthropicConverter._sanitize_tool_id(tool_use_id) params = req.params diff --git a/src/fast_agent/llm/usage_tracking.py b/src/fast_agent/llm/usage_tracking.py index e2672528..ffc491f5 100644 --- a/src/fast_agent/llm/usage_tracking.py +++ b/src/fast_agent/llm/usage_tracking.py @@ -149,6 +149,10 @@ def from_anthropic(cls, usage: AnthropicUsage, model: str) -> "TurnUsage": cache_write_tokens=cache_creation_tokens, # Tokens written to cache (25% surcharge) ) + # Extract thinking/reasoning tokens if available (extended thinking feature) + # Note: For Claude 4 models, you're billed for full thinking tokens, not summaries + thinking_tokens = getattr(usage, "thinking_tokens", 0) or 0 + return cls( provider=Provider.ANTHROPIC, model=model, @@ -156,6 +160,7 @@ def from_anthropic(cls, usage: AnthropicUsage, model: str) -> "TurnUsage": output_tokens=usage.output_tokens, total_tokens=usage.input_tokens + usage.output_tokens, cache_usage=cache_usage, + reasoning_tokens=thinking_tokens, raw_usage=usage, # Store the original Anthropic usage object ) From c3cba273095dc322dbdda78ddd000f82f3d808d7 Mon Sep 17 00:00:00 2001 From: evalstate <1936278+evalstate@users.noreply.github.com> Date: Fri, 26 Dec 2025 13:19:54 +0000 Subject: [PATCH 2/5] interim --- src/fast_agent/llm/provider/anthropic/llm_anthropic.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/fast_agent/llm/provider/anthropic/llm_anthropic.py b/src/fast_agent/llm/provider/anthropic/llm_anthropic.py index 147967f9..b1ed6c7f 100644 --- a/src/fast_agent/llm/provider/anthropic/llm_anthropic.py +++ b/src/fast_agent/llm/provider/anthropic/llm_anthropic.py @@ -21,7 +21,6 @@ TextBlockParam, TextDelta, ThinkingBlock, - ThinkingBlockParam, ThinkingDelta, ToolParam, ToolUseBlock, From a9a06c7118244534059e7366e552f550fa7da621 Mon Sep 17 00:00:00 2001 From: evalstate <1936278+evalstate@users.noreply.github.com> Date: Fri, 26 Dec 2025 21:12:00 +0000 Subject: [PATCH 3/5] fix anthropic thinking --- src/fast_agent/mcp/prompt_message_extended.py | 5 ++-- src/fast_agent/ui/interactive_prompt.py | 2 +- src/fast_agent/ui/streaming.py | 23 +++++++++++++++---- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/src/fast_agent/mcp/prompt_message_extended.py b/src/fast_agent/mcp/prompt_message_extended.py index 5ad16d9c..4e8e41f7 100644 --- a/src/fast_agent/mcp/prompt_message_extended.py +++ b/src/fast_agent/mcp/prompt_message_extended.py @@ -1,4 +1,4 @@ -from typing import Mapping, Sequence +from typing import Any, Mapping, Sequence from mcp.types import ( CallToolRequest, @@ -27,7 +27,8 @@ class PromptMessageExtended(BaseModel): content: list[ContentBlock] = [] tool_calls: dict[str, CallToolRequest] | None = None tool_results: dict[str, CallToolResult] | None = None - channels: Mapping[str, Sequence[ContentBlock]] | None = None + # Channels can carry provider-specific payloads (e.g., raw Anthropic thinking blocks). + channels: Mapping[str, Sequence[Any]] | None = None stop_reason: LlmStopReason | None = None is_template: bool = False diff --git a/src/fast_agent/ui/interactive_prompt.py b/src/fast_agent/ui/interactive_prompt.py index ce16ce2f..06ab1943 100644 --- a/src/fast_agent/ui/interactive_prompt.py +++ b/src/fast_agent/ui/interactive_prompt.py @@ -1398,7 +1398,7 @@ async def _show_markdown(self, prompt_provider: "AgentApp", agent_name: str) -> rich_print("[yellow]No message history available[/yellow]") return - message_history = agent.llm.message_history + message_history = agent.message_history if not message_history: rich_print("[yellow]No messages in history[/yellow]") return diff --git a/src/fast_agent/ui/streaming.py b/src/fast_agent/ui/streaming.py index bec39233..b2076579 100644 --- a/src/fast_agent/ui/streaming.py +++ b/src/fast_agent/ui/streaming.py @@ -124,6 +124,7 @@ def __init__( self._has_reasoning = False self._reasoning_active = False self._tool_active = False + self._render_reasoning_stream = True if self._async_mode and self._loop and self._queue is not None: self._worker_task = self._loop.create_task(self._render_worker()) @@ -582,12 +583,15 @@ def _process_reasoning_chunk(self, chunk: str) -> bool: for segment in segments: if segment.is_thinking: - self._begin_reasoning_mode() - self._append_plain_text(segment.text, is_reasoning=True) + if self._render_reasoning_stream: + self._begin_reasoning_mode() + self._append_plain_text(segment.text, is_reasoning=True) handled = True else: if self._reasoning_active: self._end_reasoning_mode() + if self._render_reasoning_stream and self._has_reasoning: + self._drop_reasoning_stream() emitted_non_thinking = True self._append_text_in_current_mode(segment.text) handled = True @@ -610,14 +614,25 @@ def _handle_stream_chunk(self, chunk: StreamChunk) -> bool: return True if chunk.is_reasoning: - self._begin_reasoning_mode() - return self._append_plain_text(chunk.text, is_reasoning=True) + if self._render_reasoning_stream: + self._begin_reasoning_mode() + return self._append_plain_text(chunk.text, is_reasoning=True) + return False + if self._render_reasoning_stream and self._has_reasoning: + self._drop_reasoning_stream() if self._reasoning_active: self._end_reasoning_mode() return self._append_text_in_current_mode(chunk.text) + def _drop_reasoning_stream(self) -> None: + self._render_reasoning_stream = False + self._has_reasoning = False + self._reasoning_active = False + self._styled_buffer.clear() + self._buffer.clear() + def _handle_chunk(self, chunk: str) -> bool: if not chunk: return False From fbaaccc18209739c37e243107923de9a07a56251 Mon Sep 17 00:00:00 2001 From: evalstate <1936278+evalstate@users.noreply.github.com> Date: Fri, 26 Dec 2025 21:22:17 +0000 Subject: [PATCH 4/5] drop reasoning content during streaming --- src/fast_agent/ui/streaming.py | 11 +++++++++-- .../unit/fast_agent/ui/test_streaming_mode_switch.py | 6 ++---- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/fast_agent/ui/streaming.py b/src/fast_agent/ui/streaming.py index b2076579..c30e4833 100644 --- a/src/fast_agent/ui/streaming.py +++ b/src/fast_agent/ui/streaming.py @@ -321,6 +321,8 @@ def _append_plain_text(self, text: str, *, is_reasoning: bool | None = None) -> processed = self._wrap_plain_chunk(processed) if self._pending_table_row: self._buffer.append(self._pending_table_row) + if self._has_reasoning: + self._styled_buffer.append((self._pending_table_row, False)) self._pending_table_row = "" self._buffer.append(processed) if self._has_reasoning: @@ -627,11 +629,16 @@ def _handle_stream_chunk(self, chunk: StreamChunk) -> bool: return self._append_text_in_current_mode(chunk.text) def _drop_reasoning_stream(self) -> None: + if not self._has_reasoning: + return + if self._styled_buffer: + kept = [text for text, is_reasoning in self._styled_buffer if not is_reasoning] + rebuilt = "".join(kept) + self._buffer = [rebuilt] if rebuilt else [] + self._styled_buffer.clear() self._render_reasoning_stream = False self._has_reasoning = False self._reasoning_active = False - self._styled_buffer.clear() - self._buffer.clear() def _handle_chunk(self, chunk: str) -> bool: if not chunk: diff --git a/tests/unit/fast_agent/ui/test_streaming_mode_switch.py b/tests/unit/fast_agent/ui/test_streaming_mode_switch.py index b1a42fab..f80cd00a 100644 --- a/tests/unit/fast_agent/ui/test_streaming_mode_switch.py +++ b/tests/unit/fast_agent/ui/test_streaming_mode_switch.py @@ -58,13 +58,11 @@ def test_reasoning_stream_switches_back_to_markdown() -> None: text = "".join(handle._buffer) intro_idx = text.find("Intro") - thinking_idx = text.find("Thinking") answer_idx = text.find("Answer") assert intro_idx != -1 - assert thinking_idx != -1 assert answer_idx != -1 - assert "\n" in text[intro_idx + len("Intro") : thinking_idx] - assert "\n" in text[thinking_idx + len("Thinking") : answer_idx] + assert text.find("Thinking") == -1 + assert "\n" in text[intro_idx + len("Intro") : answer_idx] finally: _restore_console_size(original_width, original_height) From c3c111a71e1baba0e6545df614bd18cd57c85d04 Mon Sep 17 00:00:00 2001 From: evalstate <1936278+evalstate@users.noreply.github.com> Date: Fri, 26 Dec 2025 21:35:36 +0000 Subject: [PATCH 5/5] type constraints on channels, serde --- .../llm/provider/anthropic/llm_anthropic.py | 15 ++++++- .../multipart_converter_anthropic.py | 24 +++++++++++ src/fast_agent/mcp/prompt_message_extended.py | 6 +-- .../test_multipart_converter_anthropic.py | 40 +++++++++++++++++++ 4 files changed, 81 insertions(+), 4 deletions(-) diff --git a/src/fast_agent/llm/provider/anthropic/llm_anthropic.py b/src/fast_agent/llm/provider/anthropic/llm_anthropic.py index 6cd0e4dd..dd312459 100644 --- a/src/fast_agent/llm/provider/anthropic/llm_anthropic.py +++ b/src/fast_agent/llm/provider/anthropic/llm_anthropic.py @@ -781,7 +781,20 @@ async def _anthropic_completion( if raw_thinking_blocks: if channels is None: channels = {} - channels[ANTHROPIC_THINKING_BLOCKS] = raw_thinking_blocks + serialized_blocks = [] + for block in raw_thinking_blocks: + try: + payload = block.model_dump() + except Exception: + payload = {"type": getattr(block, "type", "thinking")} + if isinstance(block, ThinkingBlock): + payload.update( + {"thinking": block.thinking, "signature": block.signature} + ) + elif isinstance(block, RedactedThinkingBlock): + payload.update({"data": block.data}) + serialized_blocks.append(TextContent(type="text", text=json.dumps(payload))) + channels[ANTHROPIC_THINKING_BLOCKS] = serialized_blocks return PromptMessageExtended( role="assistant", diff --git a/src/fast_agent/llm/provider/anthropic/multipart_converter_anthropic.py b/src/fast_agent/llm/provider/anthropic/multipart_converter_anthropic.py index 23bfa450..625b1a62 100644 --- a/src/fast_agent/llm/provider/anthropic/multipart_converter_anthropic.py +++ b/src/fast_agent/llm/provider/anthropic/multipart_converter_anthropic.py @@ -1,3 +1,4 @@ +import json import re from typing import Literal, Sequence, Union, cast from urllib.parse import urlparse @@ -11,6 +12,7 @@ MessageParam, PlainTextSourceParam, RedactedThinkingBlock, + RedactedThinkingBlockParam, TextBlockParam, ThinkingBlock, ThinkingBlockParam, @@ -106,6 +108,28 @@ def convert_to_anthropic(multipart_msg: PromptMessageExtended) -> MessageParam: # Redacted thinking blocks are passed as-is # They contain encrypted data that the API can verify all_content_blocks.append(thinking_block) + elif isinstance(thinking_block, TextContent): + try: + payload = json.loads(thinking_block.text) + except (TypeError, json.JSONDecodeError): + payload = None + if isinstance(payload, dict): + block_type = payload.get("type") + if block_type == "thinking": + all_content_blocks.append( + ThinkingBlockParam( + type="thinking", + thinking=payload.get("thinking", ""), + signature=payload.get("signature", ""), + ) + ) + elif block_type == "redacted_thinking": + all_content_blocks.append( + RedactedThinkingBlockParam( + type="redacted_thinking", + data=payload.get("data", ""), + ) + ) for tool_use_id, req in multipart_msg.tool_calls.items(): sanitized_id = AnthropicConverter._sanitize_tool_id(tool_use_id) diff --git a/src/fast_agent/mcp/prompt_message_extended.py b/src/fast_agent/mcp/prompt_message_extended.py index 4e8e41f7..6d978d28 100644 --- a/src/fast_agent/mcp/prompt_message_extended.py +++ b/src/fast_agent/mcp/prompt_message_extended.py @@ -1,4 +1,4 @@ -from typing import Any, Mapping, Sequence +from typing import Mapping, Sequence from mcp.types import ( CallToolRequest, @@ -27,11 +27,11 @@ class PromptMessageExtended(BaseModel): content: list[ContentBlock] = [] tool_calls: dict[str, CallToolRequest] | None = None tool_results: dict[str, CallToolResult] | None = None - # Channels can carry provider-specific payloads (e.g., raw Anthropic thinking blocks). - channels: Mapping[str, Sequence[Any]] | None = None + channels: Mapping[str, Sequence[ContentBlock]] | None = None stop_reason: LlmStopReason | None = None is_template: bool = False + @classmethod def to_extended(cls, messages: list[PromptMessage]) -> list["PromptMessageExtended"]: """Convert a sequence of PromptMessages into PromptMessageExtended objects.""" diff --git a/tests/unit/fast_agent/llm/providers/test_multipart_converter_anthropic.py b/tests/unit/fast_agent/llm/providers/test_multipart_converter_anthropic.py index 26fd1b35..c405bb8b 100644 --- a/tests/unit/fast_agent/llm/providers/test_multipart_converter_anthropic.py +++ b/tests/unit/fast_agent/llm/providers/test_multipart_converter_anthropic.py @@ -1,8 +1,11 @@ import base64 +import json import unittest from mcp.types import ( BlobResourceContents, + CallToolRequest, + CallToolRequestParams, CallToolResult, EmbeddedResource, ImageContent, @@ -12,6 +15,7 @@ ) from pydantic import AnyUrl +from fast_agent.constants import ANTHROPIC_THINKING_BLOCKS from fast_agent.llm.provider.anthropic.multipart_converter_anthropic import ( AnthropicConverter, ) @@ -715,6 +719,42 @@ def test_assistant_multiple_text_blocks(self): self.assertEqual(anthropic_msg["content"][1]["type"], "text") self.assertEqual(anthropic_msg["content"][1]["text"], "Second part of response") + def test_assistant_thinking_blocks_deserialized_from_channel(self): + """Ensure thinking channel JSON is converted to Anthropic thinking params.""" + thinking_payload = { + "type": "thinking", + "thinking": "Reasoning summary.", + "signature": "sig123", + } + redacted_payload = {"type": "redacted_thinking", "data": "opaque"} + channels = { + ANTHROPIC_THINKING_BLOCKS: [ + TextContent(type="text", text=json.dumps(thinking_payload)), + TextContent(type="text", text=json.dumps(redacted_payload)), + ] + } + tool_calls = { + "toolu_1": CallToolRequest( + method="tools/call", + params=CallToolRequestParams(name="test_tool", arguments={"x": 1}), + ) + } + multipart = PromptMessageExtended( + role="assistant", content=[], tool_calls=tool_calls, channels=channels + ) + + anthropic_msg = AnthropicConverter.convert_to_anthropic(multipart) + + self.assertEqual(anthropic_msg["role"], "assistant") + self.assertEqual(len(anthropic_msg["content"]), 3) + self.assertEqual(anthropic_msg["content"][0]["type"], "thinking") + self.assertEqual(anthropic_msg["content"][0]["thinking"], "Reasoning summary.") + self.assertEqual(anthropic_msg["content"][0]["signature"], "sig123") + self.assertEqual(anthropic_msg["content"][1]["type"], "redacted_thinking") + self.assertEqual(anthropic_msg["content"][1]["data"], "opaque") + self.assertEqual(anthropic_msg["content"][2]["type"], "tool_use") + self.assertEqual(anthropic_msg["content"][2]["name"], "test_tool") + def test_assistant_non_text_content_stripped(self): """Test that non-text content is stripped from assistant messages.""" # Create a mixed content message with text and image