Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/fast_agent/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,20 @@ class AnthropicSettings(BaseModel):
- "auto": Currently same as "prompt" - caches tools+system prompt (1 block) and template content.
"""

thinking_enabled: bool = False
"""
Enable extended thinking for supported Claude models (Sonnet 4+, Opus 4+).
When enabled, Claude will show its step-by-step reasoning process.
Note: Extended thinking is incompatible with structured output (forced tool choice).
"""

thinking_budget_tokens: int = 10000
"""
Maximum tokens for Claude's internal reasoning process (minimum 1024).
Larger budgets enable more thorough analysis for complex problems.
Must be less than max_tokens.
"""

model_config = ConfigDict(extra="allow", arbitrary_types_allowed=True)


Expand Down
2 changes: 2 additions & 0 deletions src/fast_agent/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
HUMAN_INPUT_TOOL_NAME = "__human_input"
MCP_UI = "mcp-ui"
REASONING = "reasoning"
ANTHROPIC_THINKING_BLOCKS = "anthropic-thinking-raw"
"""Raw Anthropic thinking blocks with signatures for tool use passback."""
FAST_AGENT_ERROR_CHANNEL = "fast-agent-error"
FAST_AGENT_REMOVED_METADATA_CHANNEL = "fast-agent-removed-meta"
FAST_AGENT_TIMING = "fast-agent-timing"
Expand Down
23 changes: 18 additions & 5 deletions src/fast_agent/llm/model_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,23 @@ class ModelDatabase:
)

ANTHROPIC_OPUS_4_VERSIONED = ModelParameters(
context_window=200000, max_output_tokens=32000, tokenizes=ANTHROPIC_MULTIMODAL
context_window=200000,
max_output_tokens=32000,
tokenizes=ANTHROPIC_MULTIMODAL,
reasoning="anthropic_thinking",
)
ANTHROPIC_SONNET_4_VERSIONED = ModelParameters(
context_window=200000, max_output_tokens=64000, tokenizes=ANTHROPIC_MULTIMODAL
context_window=200000,
max_output_tokens=64000,
tokenizes=ANTHROPIC_MULTIMODAL,
reasoning="anthropic_thinking",
)
# Claude 3.7 Sonnet supports extended thinking (deprecated but still available)
ANTHROPIC_37_SERIES_THINKING = ModelParameters(
context_window=200000,
max_output_tokens=16384,
tokenizes=ANTHROPIC_MULTIMODAL,
reasoning="anthropic_thinking",
)

DEEPSEEK_CHAT_STANDARD = ModelParameters(
Expand Down Expand Up @@ -324,9 +337,9 @@ class ModelDatabase:
"claude-3-5-sonnet-20240620": ANTHROPIC_35_SERIES,
"claude-3-5-sonnet-20241022": ANTHROPIC_35_SERIES,
"claude-3-5-sonnet-latest": ANTHROPIC_35_SERIES,
"claude-3-7-sonnet": ANTHROPIC_37_SERIES,
"claude-3-7-sonnet-20250219": ANTHROPIC_37_SERIES,
"claude-3-7-sonnet-latest": ANTHROPIC_37_SERIES,
"claude-3-7-sonnet": ANTHROPIC_37_SERIES_THINKING,
"claude-3-7-sonnet-20250219": ANTHROPIC_37_SERIES_THINKING,
"claude-3-7-sonnet-latest": ANTHROPIC_37_SERIES_THINKING,
"claude-sonnet-4-0": ANTHROPIC_SONNET_4_VERSIONED,
"claude-sonnet-4-20250514": ANTHROPIC_SONNET_4_VERSIONED,
"claude-sonnet-4-5": ANTHROPIC_SONNET_4_VERSIONED,
Expand Down
127 changes: 118 additions & 9 deletions src/fast_agent/llm/provider/anthropic/llm_anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
RawContentBlockStartEvent,
RawContentBlockStopEvent,
RawMessageDeltaEvent,
RedactedThinkingBlock,
SignatureDelta,
TextBlock,
TextBlockParam,
TextDelta,
ThinkingBlock,
ThinkingDelta,
ToolParam,
ToolUseBlock,
ToolUseBlockParam,
Expand All @@ -32,7 +36,7 @@
TextContent,
)

from fast_agent.constants import FAST_AGENT_ERROR_CHANNEL
from fast_agent.constants import ANTHROPIC_THINKING_BLOCKS, FAST_AGENT_ERROR_CHANNEL, REASONING
from fast_agent.core.exceptions import ProviderKeyError
from fast_agent.core.logging.logger import get_logger
from fast_agent.core.prompt import Prompt
Expand Down Expand Up @@ -137,6 +141,23 @@ def _get_cache_mode(self) -> str:
cache_mode = self.context.config.anthropic.cache_mode
return cache_mode

def _is_thinking_enabled(self, model: str) -> bool:
"""Check if extended thinking should be enabled for this request."""
from fast_agent.llm.model_database import ModelDatabase

if ModelDatabase.get_reasoning(model) != "anthropic_thinking":
return False
if self.context.config and self.context.config.anthropic:
return self.context.config.anthropic.thinking_enabled
return False

def _get_thinking_budget(self) -> int:
"""Get the thinking budget tokens (minimum 1024)."""
if self.context.config and self.context.config.anthropic:
budget = getattr(self.context.config.anthropic, "thinking_budget_tokens", 10000)
return max(1024, budget)
return 10000

async def _prepare_tools(
self, structured_model: Type[ModelT] | None = None, tools: list[Tool] | None = None
) -> list[ToolParam]:
Expand Down Expand Up @@ -280,11 +301,13 @@ async def _process_stream(
stream: AsyncMessageStream,
model: str,
capture_filename: Path | None = None,
) -> Message:
) -> tuple[Message, list[str]]:
"""Process the streaming response and display real-time token usage."""
# Track estimated output tokens by counting text chunks
estimated_tokens = 0
tool_streams: dict[int, dict[str, Any]] = {}
thinking_segments: list[str] = []
thinking_indices: set[int] = set()

try:
# Process the raw event stream to get token counts
Expand All @@ -295,6 +318,9 @@ async def _process_stream(

if isinstance(event, RawContentBlockStartEvent):
content_block = event.content_block
if isinstance(content_block, (ThinkingBlock, RedactedThinkingBlock)):
thinking_indices.add(event.index)
continue
if isinstance(content_block, ToolUseBlock):
tool_streams[event.index] = {
"name": content_block.name,
Expand Down Expand Up @@ -324,6 +350,15 @@ async def _process_stream(

if isinstance(event, RawContentBlockDeltaEvent):
delta = event.delta
if isinstance(delta, ThinkingDelta):
if delta.thinking:
self._notify_stream_listeners(
StreamChunk(text=delta.thinking, is_reasoning=True)
)
thinking_segments.append(delta.thinking)
continue
if isinstance(delta, SignatureDelta):
continue
if isinstance(delta, InputJSONDelta):
info = tool_streams.get(event.index)
if info is not None:
Expand All @@ -349,6 +384,10 @@ async def _process_stream(
)
continue

if isinstance(event, RawContentBlockStopEvent) and event.index in thinking_indices:
thinking_indices.discard(event.index)
continue

if isinstance(event, RawContentBlockStopEvent) and event.index in tool_streams:
info = tool_streams.pop(event.index)
preview_raw = "".join(info.get("buffer", []))
Expand Down Expand Up @@ -428,7 +467,7 @@ async def _process_stream(
f"Streaming complete - Model: {model}, Input tokens: {message.usage.input_tokens}, Output tokens: {message.usage.output_tokens}"
)

return message
return message, thinking_segments
except APIError as error:
logger.error("Streaming APIError during Anthropic completion", exc_info=error)
raise # Re-raise to be handled by _anthropic_completion
Expand Down Expand Up @@ -571,11 +610,34 @@ async def _anthropic_completion(
base_args["system"] = self.instruction or params.systemPrompt

if structured_model:
if self._is_thinking_enabled(model):
logger.warning(
"Extended thinking is incompatible with structured output. "
"Disabling thinking for this request."
)
base_args["tool_choice"] = {"type": "tool", "name": STRUCTURED_OUTPUT_TOOL_NAME}

if params.maxTokens is not None:
thinking_enabled = self._is_thinking_enabled(model)
if thinking_enabled and structured_model:
thinking_enabled = False

if thinking_enabled:
thinking_budget = self._get_thinking_budget()
base_args["thinking"] = {
"type": "enabled",
"budget_tokens": thinking_budget,
}
current_max = params.maxTokens or 16000
if current_max <= thinking_budget:
base_args["max_tokens"] = thinking_budget + 8192
else:
base_args["max_tokens"] = current_max
elif params.maxTokens is not None:
base_args["max_tokens"] = params.maxTokens

if thinking_enabled and available_tools:
base_args["extra_headers"] = {"anthropic-beta": "interleaved-thinking-2025-05-14"}

self._log_chat_progress(self.chat_turn(), model=model)
# Use the base class method to prepare all arguments with Anthropic-specific exclusions
# Do this BEFORE applying cache control so metadata doesn't override cached fields
Expand Down Expand Up @@ -613,7 +675,9 @@ async def _anthropic_completion(
try:
async with anthropic.messages.stream(**arguments) as stream:
# Process the stream
response = await self._process_stream(stream, model, capture_filename)
response, thinking_segments = await self._process_stream(
stream, model, capture_filename
)
except asyncio.CancelledError as e:
reason = str(e) if e.args else "cancelled"
logger.info(f"Anthropic completion cancelled: {reason}")
Expand Down Expand Up @@ -658,8 +722,12 @@ async def _anthropic_completion(

response_as_message = self.convert_message_to_message_param(response)
messages.append(response_as_message)
if response.content and response.content[0].type == "text":
response_content_blocks.append(TextContent(type="text", text=response.content[0].text))
if response.content:
for content_block in response.content:
if isinstance(content_block, TextBlock):
response_content_blocks.append(
TextContent(type="text", text=content_block.text)
)

stop_reason: LlmStopReason = LlmStopReason.END_TURN

Expand Down Expand Up @@ -691,8 +759,49 @@ async def _anthropic_completion(

self._log_chat_finished(model=model)

return Prompt.assistant(
*response_content_blocks, stop_reason=stop_reason, tool_calls=tool_calls
channels: dict[str, list[Any]] | None = None
if thinking_segments:
channels = {REASONING: [TextContent(type="text", text="".join(thinking_segments))]}
elif response.content:
thinking_texts = [
block.thinking
for block in response.content
if isinstance(block, ThinkingBlock) and block.thinking
]
if thinking_texts:
channels = {REASONING: [TextContent(type="text", text="".join(thinking_texts))]}

raw_thinking_blocks = []
if response.content:
raw_thinking_blocks = [
block
for block in response.content
if isinstance(block, (ThinkingBlock, RedactedThinkingBlock))
]
if raw_thinking_blocks:
if channels is None:
channels = {}
serialized_blocks = []
for block in raw_thinking_blocks:
try:
payload = block.model_dump()
except Exception:
payload = {"type": getattr(block, "type", "thinking")}
if isinstance(block, ThinkingBlock):
payload.update(
{"thinking": block.thinking, "signature": block.signature}
)
elif isinstance(block, RedactedThinkingBlock):
payload.update({"data": block.data})
serialized_blocks.append(TextContent(type="text", text=json.dumps(payload)))
channels[ANTHROPIC_THINKING_BLOCKS] = serialized_blocks

return PromptMessageExtended(
role="assistant",
content=response_content_blocks,
tool_calls=tool_calls,
channels=channels,
stop_reason=stop_reason,
)

async def _apply_prompt_provider_specific(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import re
from typing import Literal, Sequence, Union, cast
from urllib.parse import urlparse
Expand All @@ -10,7 +11,11 @@
ImageBlockParam,
MessageParam,
PlainTextSourceParam,
RedactedThinkingBlock,
RedactedThinkingBlockParam,
TextBlockParam,
ThinkingBlock,
ThinkingBlockParam,
ToolResultBlockParam,
ToolUseBlockParam,
URLImageSourceParam,
Expand All @@ -27,6 +32,7 @@
TextResourceContents,
)

from fast_agent.constants import ANTHROPIC_THINKING_BLOCKS
from fast_agent.core.logging.logger import get_logger
from fast_agent.mcp.helpers.content_helpers import (
get_image_data,
Expand Down Expand Up @@ -76,12 +82,55 @@ def convert_to_anthropic(multipart_msg: PromptMessageExtended) -> MessageParam:
An Anthropic API MessageParam object
"""
role = multipart_msg.role
all_content_blocks = []
all_content_blocks: list = []

# If this is an assistant message that contains tool_calls, convert
# those into Anthropic tool_use blocks so the next user message can
# legally include corresponding tool_result blocks.
if role == "assistant" and multipart_msg.tool_calls:
# CRITICAL: Thinking blocks must come FIRST in assistant messages
# when using extended thinking with tool use
if multipart_msg.channels:
raw_thinking = multipart_msg.channels.get(ANTHROPIC_THINKING_BLOCKS)
if raw_thinking:
for thinking_block in raw_thinking:
# Pass through raw ThinkingBlock/RedactedThinkingBlock
# These contain signatures needed for API verification
if isinstance(thinking_block, ThinkingBlock):
all_content_blocks.append(
ThinkingBlockParam(
type="thinking",
thinking=thinking_block.thinking,
signature=thinking_block.signature,
)
)
elif isinstance(thinking_block, RedactedThinkingBlock):
# Redacted thinking blocks are passed as-is
# They contain encrypted data that the API can verify
all_content_blocks.append(thinking_block)
elif isinstance(thinking_block, TextContent):
try:
payload = json.loads(thinking_block.text)
except (TypeError, json.JSONDecodeError):
payload = None
if isinstance(payload, dict):
block_type = payload.get("type")
if block_type == "thinking":
all_content_blocks.append(
ThinkingBlockParam(
type="thinking",
thinking=payload.get("thinking", ""),
signature=payload.get("signature", ""),
)
)
elif block_type == "redacted_thinking":
all_content_blocks.append(
RedactedThinkingBlockParam(
type="redacted_thinking",
data=payload.get("data", ""),
)
)

for tool_use_id, req in multipart_msg.tool_calls.items():
sanitized_id = AnthropicConverter._sanitize_tool_id(tool_use_id)
params = req.params
Expand Down
5 changes: 5 additions & 0 deletions src/fast_agent/llm/usage_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,18 @@ def from_anthropic(cls, usage: AnthropicUsage, model: str) -> "TurnUsage":
cache_write_tokens=cache_creation_tokens, # Tokens written to cache (25% surcharge)
)

# Extract thinking/reasoning tokens if available (extended thinking feature)
# Note: For Claude 4 models, you're billed for full thinking tokens, not summaries
thinking_tokens = getattr(usage, "thinking_tokens", 0) or 0

return cls(
provider=Provider.ANTHROPIC,
model=model,
input_tokens=usage.input_tokens,
output_tokens=usage.output_tokens,
total_tokens=usage.input_tokens + usage.output_tokens,
cache_usage=cache_usage,
reasoning_tokens=thinking_tokens,
raw_usage=usage, # Store the original Anthropic usage object
)

Expand Down
1 change: 1 addition & 0 deletions src/fast_agent/mcp/prompt_message_extended.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class PromptMessageExtended(BaseModel):
stop_reason: LlmStopReason | None = None
is_template: bool = False


@classmethod
def to_extended(cls, messages: list[PromptMessage]) -> list["PromptMessageExtended"]:
"""Convert a sequence of PromptMessages into PromptMessageExtended objects."""
Expand Down
Loading
Loading