diff --git a/src/fast_agent/config.py b/src/fast_agent/config.py index 4ba426f2..c68ffda2 100644 --- a/src/fast_agent/config.py +++ b/src/fast_agent/config.py @@ -296,6 +296,15 @@ class AnthropicSettings(BaseModel): - "auto": Currently same as "prompt" - caches tools+system prompt (1 block) and template content. """ + thinking_budget_tokens: int | None = None + """ + Token budget for Claude's extended thinking (None = disabled, minimum 1024 when set). + When set, Claude will show its step-by-step reasoning process for complex problems. + Larger budgets enable more thorough analysis. Must be less than max_tokens. + Note: Extended thinking is incompatible with structured output (forced tool choice), + temperature, top_k, and forced tool_choice. + """ + model_config = ConfigDict(extra="allow", arbitrary_types_allowed=True) diff --git a/src/fast_agent/constants.py b/src/fast_agent/constants.py index a02e4176..a60fa290 100644 --- a/src/fast_agent/constants.py +++ b/src/fast_agent/constants.py @@ -6,6 +6,8 @@ HUMAN_INPUT_TOOL_NAME = "__human_input" MCP_UI = "mcp-ui" REASONING = "reasoning" +ANTHROPIC_THINKING_BLOCKS = "anthropic-thinking-raw" +"""Raw Anthropic thinking blocks with signatures for tool use passback.""" FAST_AGENT_ERROR_CHANNEL = "fast-agent-error" FAST_AGENT_REMOVED_METADATA_CHANNEL = "fast-agent-removed-meta" FAST_AGENT_TIMING = "fast-agent-timing" diff --git a/src/fast_agent/llm/model_database.py b/src/fast_agent/llm/model_database.py index 4b3d6279..af2009f7 100644 --- a/src/fast_agent/llm/model_database.py +++ b/src/fast_agent/llm/model_database.py @@ -158,10 +158,23 @@ class ModelDatabase: ) ANTHROPIC_OPUS_4_VERSIONED = ModelParameters( - context_window=200000, max_output_tokens=32000, tokenizes=ANTHROPIC_MULTIMODAL + context_window=200000, + max_output_tokens=32000, + tokenizes=ANTHROPIC_MULTIMODAL, + reasoning="anthropic_thinking", ) ANTHROPIC_SONNET_4_VERSIONED = ModelParameters( - context_window=200000, max_output_tokens=64000, tokenizes=ANTHROPIC_MULTIMODAL + context_window=200000, + max_output_tokens=64000, + tokenizes=ANTHROPIC_MULTIMODAL, + reasoning="anthropic_thinking", + ) + # Claude 3.7 Sonnet supports extended thinking (deprecated but still available) + ANTHROPIC_37_SERIES_THINKING = ModelParameters( + context_window=200000, + max_output_tokens=16384, + tokenizes=ANTHROPIC_MULTIMODAL, + reasoning="anthropic_thinking", ) DEEPSEEK_CHAT_STANDARD = ModelParameters( @@ -324,9 +337,9 @@ class ModelDatabase: "claude-3-5-sonnet-20240620": ANTHROPIC_35_SERIES, "claude-3-5-sonnet-20241022": ANTHROPIC_35_SERIES, "claude-3-5-sonnet-latest": ANTHROPIC_35_SERIES, - "claude-3-7-sonnet": ANTHROPIC_37_SERIES, - "claude-3-7-sonnet-20250219": ANTHROPIC_37_SERIES, - "claude-3-7-sonnet-latest": ANTHROPIC_37_SERIES, + "claude-3-7-sonnet": ANTHROPIC_37_SERIES_THINKING, + "claude-3-7-sonnet-20250219": ANTHROPIC_37_SERIES_THINKING, + "claude-3-7-sonnet-latest": ANTHROPIC_37_SERIES_THINKING, "claude-sonnet-4-0": ANTHROPIC_SONNET_4_VERSIONED, "claude-sonnet-4-20250514": ANTHROPIC_SONNET_4_VERSIONED, "claude-sonnet-4-5": ANTHROPIC_SONNET_4_VERSIONED, diff --git a/src/fast_agent/llm/provider/anthropic/llm_anthropic.py b/src/fast_agent/llm/provider/anthropic/llm_anthropic.py index f3c6a2a9..e4cf0a6a 100644 --- a/src/fast_agent/llm/provider/anthropic/llm_anthropic.py +++ b/src/fast_agent/llm/provider/anthropic/llm_anthropic.py @@ -15,9 +15,13 @@ RawContentBlockStartEvent, RawContentBlockStopEvent, RawMessageDeltaEvent, + RedactedThinkingBlock, + SignatureDelta, TextBlock, TextBlockParam, TextDelta, + ThinkingBlock, + ThinkingDelta, ToolParam, ToolUseBlock, ToolUseBlockParam, @@ -32,7 +36,7 @@ TextContent, ) -from fast_agent.constants import FAST_AGENT_ERROR_CHANNEL +from fast_agent.constants import ANTHROPIC_THINKING_BLOCKS, FAST_AGENT_ERROR_CHANNEL, REASONING from fast_agent.core.exceptions import ProviderKeyError from fast_agent.core.logging.logger import get_logger from fast_agent.core.prompt import Prompt @@ -137,6 +141,39 @@ def _get_cache_mode(self) -> str: cache_mode = self.context.config.anthropic.cache_mode return cache_mode + def _get_thinking_budget(self, model: str) -> int | None: + """ + Get the thinking budget tokens if extended thinking is enabled. + + Returns None if thinking is disabled, otherwise returns the budget + (enforcing minimum of 1024 tokens per Anthropic API requirements). + """ + from fast_agent.llm.model_database import ModelDatabase + + # Model must support anthropic_thinking + if ModelDatabase.get_reasoning(model) != "anthropic_thinking": + return None + + # Check if budget is configured (None = disabled) + if self.context.config and self.context.config.anthropic: + budget = self.context.config.anthropic.thinking_budget_tokens + if budget is None: + return None + # Enforce minimum of 1024 per API requirements + if budget < 1024: + logger.warning( + f"Thinking budget {budget} is below minimum of 1024, using 1024" + ) + return 1024 + # Warn about high budgets that may cause timeouts + if budget > 32000: + logger.warning( + f"Thinking budget {budget} exceeds 32K; consider batch processing to avoid timeouts" + ) + return budget + + return None + async def _prepare_tools( self, structured_model: Type[ModelT] | None = None, tools: list[Tool] | None = None ) -> list[ToolParam]: @@ -280,11 +317,13 @@ async def _process_stream( stream: AsyncMessageStream, model: str, capture_filename: Path | None = None, - ) -> Message: + ) -> tuple[Message, list[str]]: """Process the streaming response and display real-time token usage.""" # Track estimated output tokens by counting text chunks estimated_tokens = 0 tool_streams: dict[int, dict[str, Any]] = {} + thinking_segments: list[str] = [] + thinking_indices: set[int] = set() try: # Process the raw event stream to get token counts @@ -295,6 +334,9 @@ async def _process_stream( if isinstance(event, RawContentBlockStartEvent): content_block = event.content_block + if isinstance(content_block, (ThinkingBlock, RedactedThinkingBlock)): + thinking_indices.add(event.index) + continue if isinstance(content_block, ToolUseBlock): tool_streams[event.index] = { "name": content_block.name, @@ -324,6 +366,15 @@ async def _process_stream( if isinstance(event, RawContentBlockDeltaEvent): delta = event.delta + if isinstance(delta, ThinkingDelta): + if delta.thinking: + self._notify_stream_listeners( + StreamChunk(text=delta.thinking, is_reasoning=True) + ) + thinking_segments.append(delta.thinking) + continue + if isinstance(delta, SignatureDelta): + continue if isinstance(delta, InputJSONDelta): info = tool_streams.get(event.index) if info is not None: @@ -349,6 +400,10 @@ async def _process_stream( ) continue + if isinstance(event, RawContentBlockStopEvent) and event.index in thinking_indices: + thinking_indices.discard(event.index) + continue + if isinstance(event, RawContentBlockStopEvent) and event.index in tool_streams: info = tool_streams.pop(event.index) preview_raw = "".join(info.get("buffer", [])) @@ -428,7 +483,7 @@ async def _process_stream( f"Streaming complete - Model: {model}, Input tokens: {message.usage.input_tokens}, Output tokens: {message.usage.output_tokens}" ) - return message + return message, thinking_segments except APIError as error: logger.error("Streaming APIError during Anthropic completion", exc_info=error) raise # Re-raise to be handled by _anthropic_completion @@ -570,12 +625,35 @@ async def _anthropic_completion( if self.instruction or params.systemPrompt: base_args["system"] = self.instruction or params.systemPrompt + # Get thinking budget (None = disabled) + thinking_budget = self._get_thinking_budget(model) + if structured_model: + if thinking_budget is not None: + logger.warning( + "Extended thinking is incompatible with structured output. " + "Disabling thinking for this request." + ) + thinking_budget = None base_args["tool_choice"] = {"type": "tool", "name": STRUCTURED_OUTPUT_TOOL_NAME} - if params.maxTokens is not None: + if thinking_budget is not None: + base_args["thinking"] = { + "type": "enabled", + "budget_tokens": thinking_budget, + } + # max_tokens must exceed budget_tokens + current_max = params.maxTokens or 16000 + if current_max <= thinking_budget: + base_args["max_tokens"] = thinking_budget + 8192 + else: + base_args["max_tokens"] = current_max + elif params.maxTokens is not None: base_args["max_tokens"] = params.maxTokens + if thinking_budget is not None and available_tools: + base_args["extra_headers"] = {"anthropic-beta": "interleaved-thinking-2025-05-14"} + self._log_chat_progress(self.chat_turn(), model=model) # Use the base class method to prepare all arguments with Anthropic-specific exclusions # Do this BEFORE applying cache control so metadata doesn't override cached fields @@ -613,7 +691,9 @@ async def _anthropic_completion( try: async with anthropic.messages.stream(**arguments) as stream: # Process the stream - response = await self._process_stream(stream, model, capture_filename) + response, thinking_segments = await self._process_stream( + stream, model, capture_filename + ) except asyncio.CancelledError as e: reason = str(e) if e.args else "cancelled" logger.info(f"Anthropic completion cancelled: {reason}") @@ -658,8 +738,12 @@ async def _anthropic_completion( response_as_message = self.convert_message_to_message_param(response) messages.append(response_as_message) - if response.content and response.content[0].type == "text": - response_content_blocks.append(TextContent(type="text", text=response.content[0].text)) + if response.content: + for content_block in response.content: + if isinstance(content_block, TextBlock): + response_content_blocks.append( + TextContent(type="text", text=content_block.text) + ) stop_reason: LlmStopReason = LlmStopReason.END_TURN @@ -691,8 +775,36 @@ async def _anthropic_completion( self._log_chat_finished(model=model) - return Prompt.assistant( - *response_content_blocks, stop_reason=stop_reason, tool_calls=tool_calls + channels: dict[str, list[Any]] | None = None + if thinking_segments: + channels = {REASONING: [TextContent(type="text", text="".join(thinking_segments))]} + elif response.content: + thinking_texts = [ + block.thinking + for block in response.content + if isinstance(block, ThinkingBlock) and block.thinking + ] + if thinking_texts: + channels = {REASONING: [TextContent(type="text", text="".join(thinking_texts))]} + + raw_thinking_blocks = [] + if response.content: + raw_thinking_blocks = [ + block + for block in response.content + if isinstance(block, (ThinkingBlock, RedactedThinkingBlock)) + ] + if raw_thinking_blocks: + if channels is None: + channels = {} + channels[ANTHROPIC_THINKING_BLOCKS] = raw_thinking_blocks + + return PromptMessageExtended( + role="assistant", + content=response_content_blocks, + tool_calls=tool_calls, + channels=channels, + stop_reason=stop_reason, ) async def _apply_prompt_provider_specific( diff --git a/src/fast_agent/llm/provider/anthropic/multipart_converter_anthropic.py b/src/fast_agent/llm/provider/anthropic/multipart_converter_anthropic.py index 9f532578..23bfa450 100644 --- a/src/fast_agent/llm/provider/anthropic/multipart_converter_anthropic.py +++ b/src/fast_agent/llm/provider/anthropic/multipart_converter_anthropic.py @@ -10,7 +10,10 @@ ImageBlockParam, MessageParam, PlainTextSourceParam, + RedactedThinkingBlock, TextBlockParam, + ThinkingBlock, + ThinkingBlockParam, ToolResultBlockParam, ToolUseBlockParam, URLImageSourceParam, @@ -27,6 +30,7 @@ TextResourceContents, ) +from fast_agent.constants import ANTHROPIC_THINKING_BLOCKS from fast_agent.core.logging.logger import get_logger from fast_agent.mcp.helpers.content_helpers import ( get_image_data, @@ -76,12 +80,33 @@ def convert_to_anthropic(multipart_msg: PromptMessageExtended) -> MessageParam: An Anthropic API MessageParam object """ role = multipart_msg.role - all_content_blocks = [] + all_content_blocks: list = [] # If this is an assistant message that contains tool_calls, convert # those into Anthropic tool_use blocks so the next user message can # legally include corresponding tool_result blocks. if role == "assistant" and multipart_msg.tool_calls: + # CRITICAL: Thinking blocks must come FIRST in assistant messages + # when using extended thinking with tool use + if multipart_msg.channels: + raw_thinking = multipart_msg.channels.get(ANTHROPIC_THINKING_BLOCKS) + if raw_thinking: + for thinking_block in raw_thinking: + # Pass through raw ThinkingBlock/RedactedThinkingBlock + # These contain signatures needed for API verification + if isinstance(thinking_block, ThinkingBlock): + all_content_blocks.append( + ThinkingBlockParam( + type="thinking", + thinking=thinking_block.thinking, + signature=thinking_block.signature, + ) + ) + elif isinstance(thinking_block, RedactedThinkingBlock): + # Redacted thinking blocks are passed as-is + # They contain encrypted data that the API can verify + all_content_blocks.append(thinking_block) + for tool_use_id, req in multipart_msg.tool_calls.items(): sanitized_id = AnthropicConverter._sanitize_tool_id(tool_use_id) params = req.params diff --git a/src/fast_agent/llm/usage_tracking.py b/src/fast_agent/llm/usage_tracking.py index e2672528..ffc491f5 100644 --- a/src/fast_agent/llm/usage_tracking.py +++ b/src/fast_agent/llm/usage_tracking.py @@ -149,6 +149,10 @@ def from_anthropic(cls, usage: AnthropicUsage, model: str) -> "TurnUsage": cache_write_tokens=cache_creation_tokens, # Tokens written to cache (25% surcharge) ) + # Extract thinking/reasoning tokens if available (extended thinking feature) + # Note: For Claude 4 models, you're billed for full thinking tokens, not summaries + thinking_tokens = getattr(usage, "thinking_tokens", 0) or 0 + return cls( provider=Provider.ANTHROPIC, model=model, @@ -156,6 +160,7 @@ def from_anthropic(cls, usage: AnthropicUsage, model: str) -> "TurnUsage": output_tokens=usage.output_tokens, total_tokens=usage.input_tokens + usage.output_tokens, cache_usage=cache_usage, + reasoning_tokens=thinking_tokens, raw_usage=usage, # Store the original Anthropic usage object ) diff --git a/src/fast_agent/mcp/prompt_message_extended.py b/src/fast_agent/mcp/prompt_message_extended.py index 5ad16d9c..4e8e41f7 100644 --- a/src/fast_agent/mcp/prompt_message_extended.py +++ b/src/fast_agent/mcp/prompt_message_extended.py @@ -1,4 +1,4 @@ -from typing import Mapping, Sequence +from typing import Any, Mapping, Sequence from mcp.types import ( CallToolRequest, @@ -27,7 +27,8 @@ class PromptMessageExtended(BaseModel): content: list[ContentBlock] = [] tool_calls: dict[str, CallToolRequest] | None = None tool_results: dict[str, CallToolResult] | None = None - channels: Mapping[str, Sequence[ContentBlock]] | None = None + # Channels can carry provider-specific payloads (e.g., raw Anthropic thinking blocks). + channels: Mapping[str, Sequence[Any]] | None = None stop_reason: LlmStopReason | None = None is_template: bool = False diff --git a/src/fast_agent/ui/interactive_prompt.py b/src/fast_agent/ui/interactive_prompt.py index ce16ce2f..06ab1943 100644 --- a/src/fast_agent/ui/interactive_prompt.py +++ b/src/fast_agent/ui/interactive_prompt.py @@ -1398,7 +1398,7 @@ async def _show_markdown(self, prompt_provider: "AgentApp", agent_name: str) -> rich_print("[yellow]No message history available[/yellow]") return - message_history = agent.llm.message_history + message_history = agent.message_history if not message_history: rich_print("[yellow]No messages in history[/yellow]") return diff --git a/src/fast_agent/ui/streaming.py b/src/fast_agent/ui/streaming.py index bec39233..b2076579 100644 --- a/src/fast_agent/ui/streaming.py +++ b/src/fast_agent/ui/streaming.py @@ -124,6 +124,7 @@ def __init__( self._has_reasoning = False self._reasoning_active = False self._tool_active = False + self._render_reasoning_stream = True if self._async_mode and self._loop and self._queue is not None: self._worker_task = self._loop.create_task(self._render_worker()) @@ -582,12 +583,15 @@ def _process_reasoning_chunk(self, chunk: str) -> bool: for segment in segments: if segment.is_thinking: - self._begin_reasoning_mode() - self._append_plain_text(segment.text, is_reasoning=True) + if self._render_reasoning_stream: + self._begin_reasoning_mode() + self._append_plain_text(segment.text, is_reasoning=True) handled = True else: if self._reasoning_active: self._end_reasoning_mode() + if self._render_reasoning_stream and self._has_reasoning: + self._drop_reasoning_stream() emitted_non_thinking = True self._append_text_in_current_mode(segment.text) handled = True @@ -610,14 +614,25 @@ def _handle_stream_chunk(self, chunk: StreamChunk) -> bool: return True if chunk.is_reasoning: - self._begin_reasoning_mode() - return self._append_plain_text(chunk.text, is_reasoning=True) + if self._render_reasoning_stream: + self._begin_reasoning_mode() + return self._append_plain_text(chunk.text, is_reasoning=True) + return False + if self._render_reasoning_stream and self._has_reasoning: + self._drop_reasoning_stream() if self._reasoning_active: self._end_reasoning_mode() return self._append_text_in_current_mode(chunk.text) + def _drop_reasoning_stream(self) -> None: + self._render_reasoning_stream = False + self._has_reasoning = False + self._reasoning_active = False + self._styled_buffer.clear() + self._buffer.clear() + def _handle_chunk(self, chunk: str) -> bool: if not chunk: return False