diff --git a/src/fast_agent/config.py b/src/fast_agent/config.py index 4ba426f2..58fad7b2 100644 --- a/src/fast_agent/config.py +++ b/src/fast_agent/config.py @@ -296,6 +296,20 @@ class AnthropicSettings(BaseModel): - "auto": Currently same as "prompt" - caches tools+system prompt (1 block) and template content. """ + thinking_enabled: bool = False + """ + Enable extended thinking for supported Claude models (Sonnet 4+, Opus 4+). + When enabled, Claude will show its step-by-step reasoning process. + Note: Extended thinking is incompatible with structured output (forced tool choice). + """ + + thinking_budget_tokens: int = 10000 + """ + Maximum tokens for Claude's internal reasoning process (minimum 1024). + Larger budgets enable more thorough analysis for complex problems. + Must be less than max_tokens. + """ + model_config = ConfigDict(extra="allow", arbitrary_types_allowed=True) diff --git a/src/fast_agent/constants.py b/src/fast_agent/constants.py index a02e4176..a60fa290 100644 --- a/src/fast_agent/constants.py +++ b/src/fast_agent/constants.py @@ -6,6 +6,8 @@ HUMAN_INPUT_TOOL_NAME = "__human_input" MCP_UI = "mcp-ui" REASONING = "reasoning" +ANTHROPIC_THINKING_BLOCKS = "anthropic-thinking-raw" +"""Raw Anthropic thinking blocks with signatures for tool use passback.""" FAST_AGENT_ERROR_CHANNEL = "fast-agent-error" FAST_AGENT_REMOVED_METADATA_CHANNEL = "fast-agent-removed-meta" FAST_AGENT_TIMING = "fast-agent-timing" diff --git a/src/fast_agent/llm/model_database.py b/src/fast_agent/llm/model_database.py index 4b3d6279..af2009f7 100644 --- a/src/fast_agent/llm/model_database.py +++ b/src/fast_agent/llm/model_database.py @@ -158,10 +158,23 @@ class ModelDatabase: ) ANTHROPIC_OPUS_4_VERSIONED = ModelParameters( - context_window=200000, max_output_tokens=32000, tokenizes=ANTHROPIC_MULTIMODAL + context_window=200000, + max_output_tokens=32000, + tokenizes=ANTHROPIC_MULTIMODAL, + reasoning="anthropic_thinking", ) ANTHROPIC_SONNET_4_VERSIONED = ModelParameters( - context_window=200000, max_output_tokens=64000, tokenizes=ANTHROPIC_MULTIMODAL + context_window=200000, + max_output_tokens=64000, + tokenizes=ANTHROPIC_MULTIMODAL, + reasoning="anthropic_thinking", + ) + # Claude 3.7 Sonnet supports extended thinking (deprecated but still available) + ANTHROPIC_37_SERIES_THINKING = ModelParameters( + context_window=200000, + max_output_tokens=16384, + tokenizes=ANTHROPIC_MULTIMODAL, + reasoning="anthropic_thinking", ) DEEPSEEK_CHAT_STANDARD = ModelParameters( @@ -324,9 +337,9 @@ class ModelDatabase: "claude-3-5-sonnet-20240620": ANTHROPIC_35_SERIES, "claude-3-5-sonnet-20241022": ANTHROPIC_35_SERIES, "claude-3-5-sonnet-latest": ANTHROPIC_35_SERIES, - "claude-3-7-sonnet": ANTHROPIC_37_SERIES, - "claude-3-7-sonnet-20250219": ANTHROPIC_37_SERIES, - "claude-3-7-sonnet-latest": ANTHROPIC_37_SERIES, + "claude-3-7-sonnet": ANTHROPIC_37_SERIES_THINKING, + "claude-3-7-sonnet-20250219": ANTHROPIC_37_SERIES_THINKING, + "claude-3-7-sonnet-latest": ANTHROPIC_37_SERIES_THINKING, "claude-sonnet-4-0": ANTHROPIC_SONNET_4_VERSIONED, "claude-sonnet-4-20250514": ANTHROPIC_SONNET_4_VERSIONED, "claude-sonnet-4-5": ANTHROPIC_SONNET_4_VERSIONED, diff --git a/src/fast_agent/llm/provider/anthropic/llm_anthropic.py b/src/fast_agent/llm/provider/anthropic/llm_anthropic.py index f3c6a2a9..dd312459 100644 --- a/src/fast_agent/llm/provider/anthropic/llm_anthropic.py +++ b/src/fast_agent/llm/provider/anthropic/llm_anthropic.py @@ -15,9 +15,13 @@ RawContentBlockStartEvent, RawContentBlockStopEvent, RawMessageDeltaEvent, + RedactedThinkingBlock, + SignatureDelta, TextBlock, TextBlockParam, TextDelta, + ThinkingBlock, + ThinkingDelta, ToolParam, ToolUseBlock, ToolUseBlockParam, @@ -32,7 +36,7 @@ TextContent, ) -from fast_agent.constants import FAST_AGENT_ERROR_CHANNEL +from fast_agent.constants import ANTHROPIC_THINKING_BLOCKS, FAST_AGENT_ERROR_CHANNEL, REASONING from fast_agent.core.exceptions import ProviderKeyError from fast_agent.core.logging.logger import get_logger from fast_agent.core.prompt import Prompt @@ -137,6 +141,23 @@ def _get_cache_mode(self) -> str: cache_mode = self.context.config.anthropic.cache_mode return cache_mode + def _is_thinking_enabled(self, model: str) -> bool: + """Check if extended thinking should be enabled for this request.""" + from fast_agent.llm.model_database import ModelDatabase + + if ModelDatabase.get_reasoning(model) != "anthropic_thinking": + return False + if self.context.config and self.context.config.anthropic: + return self.context.config.anthropic.thinking_enabled + return False + + def _get_thinking_budget(self) -> int: + """Get the thinking budget tokens (minimum 1024).""" + if self.context.config and self.context.config.anthropic: + budget = getattr(self.context.config.anthropic, "thinking_budget_tokens", 10000) + return max(1024, budget) + return 10000 + async def _prepare_tools( self, structured_model: Type[ModelT] | None = None, tools: list[Tool] | None = None ) -> list[ToolParam]: @@ -280,11 +301,13 @@ async def _process_stream( stream: AsyncMessageStream, model: str, capture_filename: Path | None = None, - ) -> Message: + ) -> tuple[Message, list[str]]: """Process the streaming response and display real-time token usage.""" # Track estimated output tokens by counting text chunks estimated_tokens = 0 tool_streams: dict[int, dict[str, Any]] = {} + thinking_segments: list[str] = [] + thinking_indices: set[int] = set() try: # Process the raw event stream to get token counts @@ -295,6 +318,9 @@ async def _process_stream( if isinstance(event, RawContentBlockStartEvent): content_block = event.content_block + if isinstance(content_block, (ThinkingBlock, RedactedThinkingBlock)): + thinking_indices.add(event.index) + continue if isinstance(content_block, ToolUseBlock): tool_streams[event.index] = { "name": content_block.name, @@ -324,6 +350,15 @@ async def _process_stream( if isinstance(event, RawContentBlockDeltaEvent): delta = event.delta + if isinstance(delta, ThinkingDelta): + if delta.thinking: + self._notify_stream_listeners( + StreamChunk(text=delta.thinking, is_reasoning=True) + ) + thinking_segments.append(delta.thinking) + continue + if isinstance(delta, SignatureDelta): + continue if isinstance(delta, InputJSONDelta): info = tool_streams.get(event.index) if info is not None: @@ -349,6 +384,10 @@ async def _process_stream( ) continue + if isinstance(event, RawContentBlockStopEvent) and event.index in thinking_indices: + thinking_indices.discard(event.index) + continue + if isinstance(event, RawContentBlockStopEvent) and event.index in tool_streams: info = tool_streams.pop(event.index) preview_raw = "".join(info.get("buffer", [])) @@ -428,7 +467,7 @@ async def _process_stream( f"Streaming complete - Model: {model}, Input tokens: {message.usage.input_tokens}, Output tokens: {message.usage.output_tokens}" ) - return message + return message, thinking_segments except APIError as error: logger.error("Streaming APIError during Anthropic completion", exc_info=error) raise # Re-raise to be handled by _anthropic_completion @@ -571,11 +610,34 @@ async def _anthropic_completion( base_args["system"] = self.instruction or params.systemPrompt if structured_model: + if self._is_thinking_enabled(model): + logger.warning( + "Extended thinking is incompatible with structured output. " + "Disabling thinking for this request." + ) base_args["tool_choice"] = {"type": "tool", "name": STRUCTURED_OUTPUT_TOOL_NAME} - if params.maxTokens is not None: + thinking_enabled = self._is_thinking_enabled(model) + if thinking_enabled and structured_model: + thinking_enabled = False + + if thinking_enabled: + thinking_budget = self._get_thinking_budget() + base_args["thinking"] = { + "type": "enabled", + "budget_tokens": thinking_budget, + } + current_max = params.maxTokens or 16000 + if current_max <= thinking_budget: + base_args["max_tokens"] = thinking_budget + 8192 + else: + base_args["max_tokens"] = current_max + elif params.maxTokens is not None: base_args["max_tokens"] = params.maxTokens + if thinking_enabled and available_tools: + base_args["extra_headers"] = {"anthropic-beta": "interleaved-thinking-2025-05-14"} + self._log_chat_progress(self.chat_turn(), model=model) # Use the base class method to prepare all arguments with Anthropic-specific exclusions # Do this BEFORE applying cache control so metadata doesn't override cached fields @@ -613,7 +675,9 @@ async def _anthropic_completion( try: async with anthropic.messages.stream(**arguments) as stream: # Process the stream - response = await self._process_stream(stream, model, capture_filename) + response, thinking_segments = await self._process_stream( + stream, model, capture_filename + ) except asyncio.CancelledError as e: reason = str(e) if e.args else "cancelled" logger.info(f"Anthropic completion cancelled: {reason}") @@ -658,8 +722,12 @@ async def _anthropic_completion( response_as_message = self.convert_message_to_message_param(response) messages.append(response_as_message) - if response.content and response.content[0].type == "text": - response_content_blocks.append(TextContent(type="text", text=response.content[0].text)) + if response.content: + for content_block in response.content: + if isinstance(content_block, TextBlock): + response_content_blocks.append( + TextContent(type="text", text=content_block.text) + ) stop_reason: LlmStopReason = LlmStopReason.END_TURN @@ -691,8 +759,49 @@ async def _anthropic_completion( self._log_chat_finished(model=model) - return Prompt.assistant( - *response_content_blocks, stop_reason=stop_reason, tool_calls=tool_calls + channels: dict[str, list[Any]] | None = None + if thinking_segments: + channels = {REASONING: [TextContent(type="text", text="".join(thinking_segments))]} + elif response.content: + thinking_texts = [ + block.thinking + for block in response.content + if isinstance(block, ThinkingBlock) and block.thinking + ] + if thinking_texts: + channels = {REASONING: [TextContent(type="text", text="".join(thinking_texts))]} + + raw_thinking_blocks = [] + if response.content: + raw_thinking_blocks = [ + block + for block in response.content + if isinstance(block, (ThinkingBlock, RedactedThinkingBlock)) + ] + if raw_thinking_blocks: + if channels is None: + channels = {} + serialized_blocks = [] + for block in raw_thinking_blocks: + try: + payload = block.model_dump() + except Exception: + payload = {"type": getattr(block, "type", "thinking")} + if isinstance(block, ThinkingBlock): + payload.update( + {"thinking": block.thinking, "signature": block.signature} + ) + elif isinstance(block, RedactedThinkingBlock): + payload.update({"data": block.data}) + serialized_blocks.append(TextContent(type="text", text=json.dumps(payload))) + channels[ANTHROPIC_THINKING_BLOCKS] = serialized_blocks + + return PromptMessageExtended( + role="assistant", + content=response_content_blocks, + tool_calls=tool_calls, + channels=channels, + stop_reason=stop_reason, ) async def _apply_prompt_provider_specific( diff --git a/src/fast_agent/llm/provider/anthropic/multipart_converter_anthropic.py b/src/fast_agent/llm/provider/anthropic/multipart_converter_anthropic.py index 9f532578..625b1a62 100644 --- a/src/fast_agent/llm/provider/anthropic/multipart_converter_anthropic.py +++ b/src/fast_agent/llm/provider/anthropic/multipart_converter_anthropic.py @@ -1,3 +1,4 @@ +import json import re from typing import Literal, Sequence, Union, cast from urllib.parse import urlparse @@ -10,7 +11,11 @@ ImageBlockParam, MessageParam, PlainTextSourceParam, + RedactedThinkingBlock, + RedactedThinkingBlockParam, TextBlockParam, + ThinkingBlock, + ThinkingBlockParam, ToolResultBlockParam, ToolUseBlockParam, URLImageSourceParam, @@ -27,6 +32,7 @@ TextResourceContents, ) +from fast_agent.constants import ANTHROPIC_THINKING_BLOCKS from fast_agent.core.logging.logger import get_logger from fast_agent.mcp.helpers.content_helpers import ( get_image_data, @@ -76,12 +82,55 @@ def convert_to_anthropic(multipart_msg: PromptMessageExtended) -> MessageParam: An Anthropic API MessageParam object """ role = multipart_msg.role - all_content_blocks = [] + all_content_blocks: list = [] # If this is an assistant message that contains tool_calls, convert # those into Anthropic tool_use blocks so the next user message can # legally include corresponding tool_result blocks. if role == "assistant" and multipart_msg.tool_calls: + # CRITICAL: Thinking blocks must come FIRST in assistant messages + # when using extended thinking with tool use + if multipart_msg.channels: + raw_thinking = multipart_msg.channels.get(ANTHROPIC_THINKING_BLOCKS) + if raw_thinking: + for thinking_block in raw_thinking: + # Pass through raw ThinkingBlock/RedactedThinkingBlock + # These contain signatures needed for API verification + if isinstance(thinking_block, ThinkingBlock): + all_content_blocks.append( + ThinkingBlockParam( + type="thinking", + thinking=thinking_block.thinking, + signature=thinking_block.signature, + ) + ) + elif isinstance(thinking_block, RedactedThinkingBlock): + # Redacted thinking blocks are passed as-is + # They contain encrypted data that the API can verify + all_content_blocks.append(thinking_block) + elif isinstance(thinking_block, TextContent): + try: + payload = json.loads(thinking_block.text) + except (TypeError, json.JSONDecodeError): + payload = None + if isinstance(payload, dict): + block_type = payload.get("type") + if block_type == "thinking": + all_content_blocks.append( + ThinkingBlockParam( + type="thinking", + thinking=payload.get("thinking", ""), + signature=payload.get("signature", ""), + ) + ) + elif block_type == "redacted_thinking": + all_content_blocks.append( + RedactedThinkingBlockParam( + type="redacted_thinking", + data=payload.get("data", ""), + ) + ) + for tool_use_id, req in multipart_msg.tool_calls.items(): sanitized_id = AnthropicConverter._sanitize_tool_id(tool_use_id) params = req.params diff --git a/src/fast_agent/llm/usage_tracking.py b/src/fast_agent/llm/usage_tracking.py index e2672528..ffc491f5 100644 --- a/src/fast_agent/llm/usage_tracking.py +++ b/src/fast_agent/llm/usage_tracking.py @@ -149,6 +149,10 @@ def from_anthropic(cls, usage: AnthropicUsage, model: str) -> "TurnUsage": cache_write_tokens=cache_creation_tokens, # Tokens written to cache (25% surcharge) ) + # Extract thinking/reasoning tokens if available (extended thinking feature) + # Note: For Claude 4 models, you're billed for full thinking tokens, not summaries + thinking_tokens = getattr(usage, "thinking_tokens", 0) or 0 + return cls( provider=Provider.ANTHROPIC, model=model, @@ -156,6 +160,7 @@ def from_anthropic(cls, usage: AnthropicUsage, model: str) -> "TurnUsage": output_tokens=usage.output_tokens, total_tokens=usage.input_tokens + usage.output_tokens, cache_usage=cache_usage, + reasoning_tokens=thinking_tokens, raw_usage=usage, # Store the original Anthropic usage object ) diff --git a/src/fast_agent/mcp/prompt_message_extended.py b/src/fast_agent/mcp/prompt_message_extended.py index 5ad16d9c..6d978d28 100644 --- a/src/fast_agent/mcp/prompt_message_extended.py +++ b/src/fast_agent/mcp/prompt_message_extended.py @@ -31,6 +31,7 @@ class PromptMessageExtended(BaseModel): stop_reason: LlmStopReason | None = None is_template: bool = False + @classmethod def to_extended(cls, messages: list[PromptMessage]) -> list["PromptMessageExtended"]: """Convert a sequence of PromptMessages into PromptMessageExtended objects.""" diff --git a/src/fast_agent/ui/interactive_prompt.py b/src/fast_agent/ui/interactive_prompt.py index ce16ce2f..06ab1943 100644 --- a/src/fast_agent/ui/interactive_prompt.py +++ b/src/fast_agent/ui/interactive_prompt.py @@ -1398,7 +1398,7 @@ async def _show_markdown(self, prompt_provider: "AgentApp", agent_name: str) -> rich_print("[yellow]No message history available[/yellow]") return - message_history = agent.llm.message_history + message_history = agent.message_history if not message_history: rich_print("[yellow]No messages in history[/yellow]") return diff --git a/src/fast_agent/ui/streaming.py b/src/fast_agent/ui/streaming.py index bec39233..c30e4833 100644 --- a/src/fast_agent/ui/streaming.py +++ b/src/fast_agent/ui/streaming.py @@ -124,6 +124,7 @@ def __init__( self._has_reasoning = False self._reasoning_active = False self._tool_active = False + self._render_reasoning_stream = True if self._async_mode and self._loop and self._queue is not None: self._worker_task = self._loop.create_task(self._render_worker()) @@ -320,6 +321,8 @@ def _append_plain_text(self, text: str, *, is_reasoning: bool | None = None) -> processed = self._wrap_plain_chunk(processed) if self._pending_table_row: self._buffer.append(self._pending_table_row) + if self._has_reasoning: + self._styled_buffer.append((self._pending_table_row, False)) self._pending_table_row = "" self._buffer.append(processed) if self._has_reasoning: @@ -582,12 +585,15 @@ def _process_reasoning_chunk(self, chunk: str) -> bool: for segment in segments: if segment.is_thinking: - self._begin_reasoning_mode() - self._append_plain_text(segment.text, is_reasoning=True) + if self._render_reasoning_stream: + self._begin_reasoning_mode() + self._append_plain_text(segment.text, is_reasoning=True) handled = True else: if self._reasoning_active: self._end_reasoning_mode() + if self._render_reasoning_stream and self._has_reasoning: + self._drop_reasoning_stream() emitted_non_thinking = True self._append_text_in_current_mode(segment.text) handled = True @@ -610,14 +616,30 @@ def _handle_stream_chunk(self, chunk: StreamChunk) -> bool: return True if chunk.is_reasoning: - self._begin_reasoning_mode() - return self._append_plain_text(chunk.text, is_reasoning=True) + if self._render_reasoning_stream: + self._begin_reasoning_mode() + return self._append_plain_text(chunk.text, is_reasoning=True) + return False + if self._render_reasoning_stream and self._has_reasoning: + self._drop_reasoning_stream() if self._reasoning_active: self._end_reasoning_mode() return self._append_text_in_current_mode(chunk.text) + def _drop_reasoning_stream(self) -> None: + if not self._has_reasoning: + return + if self._styled_buffer: + kept = [text for text, is_reasoning in self._styled_buffer if not is_reasoning] + rebuilt = "".join(kept) + self._buffer = [rebuilt] if rebuilt else [] + self._styled_buffer.clear() + self._render_reasoning_stream = False + self._has_reasoning = False + self._reasoning_active = False + def _handle_chunk(self, chunk: str) -> bool: if not chunk: return False diff --git a/tests/unit/fast_agent/llm/providers/test_multipart_converter_anthropic.py b/tests/unit/fast_agent/llm/providers/test_multipart_converter_anthropic.py index 26fd1b35..c405bb8b 100644 --- a/tests/unit/fast_agent/llm/providers/test_multipart_converter_anthropic.py +++ b/tests/unit/fast_agent/llm/providers/test_multipart_converter_anthropic.py @@ -1,8 +1,11 @@ import base64 +import json import unittest from mcp.types import ( BlobResourceContents, + CallToolRequest, + CallToolRequestParams, CallToolResult, EmbeddedResource, ImageContent, @@ -12,6 +15,7 @@ ) from pydantic import AnyUrl +from fast_agent.constants import ANTHROPIC_THINKING_BLOCKS from fast_agent.llm.provider.anthropic.multipart_converter_anthropic import ( AnthropicConverter, ) @@ -715,6 +719,42 @@ def test_assistant_multiple_text_blocks(self): self.assertEqual(anthropic_msg["content"][1]["type"], "text") self.assertEqual(anthropic_msg["content"][1]["text"], "Second part of response") + def test_assistant_thinking_blocks_deserialized_from_channel(self): + """Ensure thinking channel JSON is converted to Anthropic thinking params.""" + thinking_payload = { + "type": "thinking", + "thinking": "Reasoning summary.", + "signature": "sig123", + } + redacted_payload = {"type": "redacted_thinking", "data": "opaque"} + channels = { + ANTHROPIC_THINKING_BLOCKS: [ + TextContent(type="text", text=json.dumps(thinking_payload)), + TextContent(type="text", text=json.dumps(redacted_payload)), + ] + } + tool_calls = { + "toolu_1": CallToolRequest( + method="tools/call", + params=CallToolRequestParams(name="test_tool", arguments={"x": 1}), + ) + } + multipart = PromptMessageExtended( + role="assistant", content=[], tool_calls=tool_calls, channels=channels + ) + + anthropic_msg = AnthropicConverter.convert_to_anthropic(multipart) + + self.assertEqual(anthropic_msg["role"], "assistant") + self.assertEqual(len(anthropic_msg["content"]), 3) + self.assertEqual(anthropic_msg["content"][0]["type"], "thinking") + self.assertEqual(anthropic_msg["content"][0]["thinking"], "Reasoning summary.") + self.assertEqual(anthropic_msg["content"][0]["signature"], "sig123") + self.assertEqual(anthropic_msg["content"][1]["type"], "redacted_thinking") + self.assertEqual(anthropic_msg["content"][1]["data"], "opaque") + self.assertEqual(anthropic_msg["content"][2]["type"], "tool_use") + self.assertEqual(anthropic_msg["content"][2]["name"], "test_tool") + def test_assistant_non_text_content_stripped(self): """Test that non-text content is stripped from assistant messages.""" # Create a mixed content message with text and image diff --git a/tests/unit/fast_agent/ui/test_streaming_mode_switch.py b/tests/unit/fast_agent/ui/test_streaming_mode_switch.py index b1a42fab..f80cd00a 100644 --- a/tests/unit/fast_agent/ui/test_streaming_mode_switch.py +++ b/tests/unit/fast_agent/ui/test_streaming_mode_switch.py @@ -58,13 +58,11 @@ def test_reasoning_stream_switches_back_to_markdown() -> None: text = "".join(handle._buffer) intro_idx = text.find("Intro") - thinking_idx = text.find("Thinking") answer_idx = text.find("Answer") assert intro_idx != -1 - assert thinking_idx != -1 assert answer_idx != -1 - assert "\n" in text[intro_idx + len("Intro") : thinking_idx] - assert "\n" in text[thinking_idx + len("Thinking") : answer_idx] + assert text.find("Thinking") == -1 + assert "\n" in text[intro_idx + len("Intro") : answer_idx] finally: _restore_console_size(original_width, original_height)