Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions apps/backend/agents/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,10 @@
# Configuration constants
AUTO_CONTINUE_DELAY_SECONDS = 3
HUMAN_INTERVENTION_FILE = "PAUSE"

# Retry configuration for 400 tool concurrency errors
MAX_CONCURRENCY_RETRIES = 5 # Maximum number of retries for tool concurrency errors
INITIAL_RETRY_DELAY_SECONDS = (
2 # Initial retry delay (doubles each retry: 2s, 4s, 8s, 16s, 32s)
)
MAX_RETRY_DELAY_SECONDS = 32 # Cap retry delay at 32 seconds
140 changes: 134 additions & 6 deletions apps/backend/agents/coder.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,13 @@
print_status,
)

from .base import AUTO_CONTINUE_DELAY_SECONDS, HUMAN_INTERVENTION_FILE
from .base import (
AUTO_CONTINUE_DELAY_SECONDS,
HUMAN_INTERVENTION_FILE,
INITIAL_RETRY_DELAY_SECONDS,
MAX_CONCURRENCY_RETRIES,
MAX_RETRY_DELAY_SECONDS,
)
from .memory_manager import debug_memory_system_status, get_graphiti_context
from .session import post_session_processing, run_agent_session
from .utils import (
Expand Down Expand Up @@ -215,6 +221,18 @@ def _validate_and_fix_implementation_plan() -> tuple[bool, list[str]]:

# Main loop
iteration = 0
consecutive_concurrency_errors = 0 # Track consecutive 400 tool concurrency errors
current_retry_delay = INITIAL_RETRY_DELAY_SECONDS # Exponential backoff delay
concurrency_error_context: str | None = (
None # Context to pass to agent after concurrency error
)

def _reset_concurrency_state() -> None:
"""Reset concurrency error tracking state after a successful session or non-concurrency error."""
nonlocal consecutive_concurrency_errors, current_retry_delay, concurrency_error_context
consecutive_concurrency_errors = 0
current_retry_delay = INITIAL_RETRY_DELAY_SECONDS
concurrency_error_context = None

while True:
iteration += 1
Expand Down Expand Up @@ -405,6 +423,14 @@ def _validate_and_fix_implementation_plan() -> tuple[bool, list[str]]:
prompt += "\n\n" + graphiti_context
print_status("Graphiti memory context loaded", "success")

# Add concurrency error context if recovering from 400 error
if concurrency_error_context:
prompt += "\n\n" + concurrency_error_context
print_status(
f"Added tool concurrency error context (retry {consecutive_concurrency_errors}/{MAX_CONCURRENCY_RETRIES})",
"warning",
)

# Show what we're working on
print(f"Working on: {highlight(subtask_id)}")
print(f"Description: {next_subtask.get('description', 'No description')}")
Expand All @@ -419,7 +445,7 @@ def _validate_and_fix_implementation_plan() -> tuple[bool, list[str]]:

# Run session with async context manager
async with client:
status, response = await run_agent_session(
status, response, error_info = await run_agent_session(
client, prompt, spec_dir, verbose, phase=current_log_phase
)

Expand Down Expand Up @@ -512,6 +538,9 @@ def _validate_and_fix_implementation_plan() -> tuple[bool, list[str]]:
print_build_complete_banner(spec_dir)
status_manager.update(state=BuildState.COMPLETE)

# Reset error tracking on success
_reset_concurrency_state()

if task_logger:
task_logger.end_phase(
LogPhase.CODING,
Expand All @@ -526,6 +555,9 @@ def _validate_and_fix_implementation_plan() -> tuple[bool, list[str]]:
break

elif status == "continue":
# Reset error tracking on successful session
_reset_concurrency_state()

print(
muted(
f"\nAgent will auto-continue in {AUTO_CONTINUE_DELAY_SECONDS}s..."
Expand Down Expand Up @@ -556,10 +588,106 @@ def _validate_and_fix_implementation_plan() -> tuple[bool, list[str]]:

elif status == "error":
emit_phase(ExecutionPhase.FAILED, "Session encountered an error")
print_status("Session encountered an error", "error")
print(muted("Will retry with a fresh session..."))
status_manager.update(state=BuildState.ERROR)
await asyncio.sleep(AUTO_CONTINUE_DELAY_SECONDS)

# Check if this is a tool concurrency error (400)
is_concurrency_error = (
error_info and error_info.get("type") == "tool_concurrency"
)

if is_concurrency_error:
consecutive_concurrency_errors += 1

# Check if we've exceeded max retries (allow 5 retries with delays: 2s, 4s, 8s, 16s, 32s)
if consecutive_concurrency_errors > MAX_CONCURRENCY_RETRIES:
print_status(
f"Tool concurrency limit hit {consecutive_concurrency_errors} times consecutively",
"error",
)
print()
print("=" * 70)
print(" CRITICAL: Agent stuck in retry loop")
print("=" * 70)
print()
print(
"The agent is repeatedly hitting Claude API's tool concurrency limit."
)
print(
"This usually means the agent is trying to use too many tools at once."
)
print()
print("Possible solutions:")
print(" 1. The agent needs to reduce tool usage per request")
print(" 2. Break down the current subtask into smaller steps")
print(" 3. Manual intervention may be required")
print()
print(f"Error: {error_info.get('message', 'Unknown error')[:200]}")
print()

# Mark current subtask as stuck if we have one
if subtask_id:
recovery_manager.mark_subtask_stuck(
subtask_id,
f"Tool concurrency errors after {consecutive_concurrency_errors} retries",
)
print_status(f"Subtask {subtask_id} marked as STUCK", "error")

status_manager.update(state=BuildState.ERROR)
break # Exit the loop

# Exponential backoff: 2s, 4s, 8s, 16s, 32s
print_status(
f"Tool concurrency error (retry {consecutive_concurrency_errors}/{MAX_CONCURRENCY_RETRIES})",
"warning",
)
print(
muted(
f"Waiting {current_retry_delay}s before retry (exponential backoff)..."
)
)
print()

# Set context for next retry so agent knows to adjust behavior
error_context_message = (
"## CRITICAL: TOOL CONCURRENCY ERROR\n\n"
f"Your previous session hit Claude API's tool concurrency limit (HTTP 400).\n"
f"This is retry {consecutive_concurrency_errors}/{MAX_CONCURRENCY_RETRIES}.\n\n"
"**IMPORTANT: You MUST adjust your approach:**\n"
"1. Use ONE tool at a time - do NOT call multiple tools in parallel\n"
"2. Wait for each tool result before calling the next tool\n"
"3. Avoid starting with `pwd` or multiple Read calls at once\n"
"4. If you need to read multiple files, read them one by one\n"
"5. Take a more incremental, step-by-step approach\n\n"
"Start by focusing on ONE specific action for this subtask."
)
Comment on lines +653 to +664
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This large, multi-line string for the agent's error prompt is defined directly within the main agent loop. For better maintainability and separation of concerns, it's a good practice to extract prompt templates from the core logic.

Consider moving this template to a dedicated prompts module (e.g., prompts.py) and generating it with a helper function. This would make the core agent logic cleaner and centralize prompt management.

For example, in a prompts module:

def generate_concurrency_error_prompt(retry_count: int, max_retries: int) -> str:
    return f'''## CRITICAL: TOOL CONCURRENCY ERROR

Your previous session hit Claude API's tool concurrency limit (HTTP 400).
This is retry {retry_count}/{max_retries}.

**IMPORTANT: You MUST adjust your approach:**
1. Use ONE tool at a time - do NOT call multiple tools in parallel
2. Wait for each tool result before calling the next tool
3. Avoid starting with `pwd` or multiple Read calls at once
4. If you need to read multiple files, read them one by one
5. Take a more incremental, step-by-step approach

Start by focusing on ONE specific action for this subtask.'''


# If we're in planning phase, reset first_run to True so next iteration
# re-enters the planning branch (fix for issue #1565)
if current_log_phase == LogPhase.PLANNING:
first_run = True
planning_retry_context = error_context_message
print_status(
"Planning session failed - will retry planning", "warning"
)
else:
concurrency_error_context = error_context_message

status_manager.update(state=BuildState.ERROR)
await asyncio.sleep(current_retry_delay)

# Double the retry delay for next time (cap at MAX_RETRY_DELAY_SECONDS)
current_retry_delay = min(
current_retry_delay * 2, MAX_RETRY_DELAY_SECONDS
)

else:
# Other errors - use standard retry logic
print_status("Session encountered an error", "error")
print(muted("Will retry with a fresh session..."))
status_manager.update(state=BuildState.ERROR)
await asyncio.sleep(AUTO_CONTINUE_DELAY_SECONDS)

# Reset concurrency error tracking on non-concurrency errors
_reset_concurrency_state()

# Small delay between sessions
if max_iterations is None or iteration < max_iterations:
Expand Down
2 changes: 1 addition & 1 deletion apps/backend/agents/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ async def run_followup_planner(
try:
# Run single planning session
async with client:
status, response = await run_agent_session(
status, response, error_info = await run_agent_session(
client, prompt, spec_dir, verbose, phase=LogPhase.PLANNING
)

Expand Down
62 changes: 53 additions & 9 deletions apps/backend/agents/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,28 @@
logger = logging.getLogger(__name__)


def is_tool_concurrency_error(error: Exception) -> bool:
"""
Check if an error is a 400 tool concurrency error from Claude API.

Tool concurrency errors occur when too many tools are used simultaneously
in a single API request, hitting Claude's concurrent tool use limit.

Args:
error: The exception to check

Returns:
True if this is a tool concurrency error, False otherwise
"""
error_str = str(error).lower()
# Check for 400 status AND tool concurrency keywords
return "400" in error_str and (
("tool" in error_str and "concurrency" in error_str)
or "too many tools" in error_str
or "concurrent tool" in error_str
)


async def post_session_processing(
spec_dir: Path,
project_dir: Path,
Expand Down Expand Up @@ -317,7 +339,7 @@ async def run_agent_session(
spec_dir: Path,
verbose: bool = False,
phase: LogPhase = LogPhase.CODING,
) -> tuple[str, str]:
) -> tuple[str, str, dict]:
"""
Run a single agent session using Claude Agent SDK.

Expand All @@ -329,10 +351,13 @@ async def run_agent_session(
phase: Current execution phase for logging

Returns:
(status, response_text) where status is:
- "continue" if agent should continue working
- "complete" if all subtasks complete
- "error" if an error occurred
(status, response_text, error_info) where:
- status: "continue", "complete", or "error"
- response_text: Agent's response text
- error_info: Dict with error details (empty if no error):
- "type": "tool_concurrency" or "other"
- "message": Error message string
- "exception_type": Exception class name string
"""
debug_section("session", f"Agent Session - {phase.value}")
debug(
Expand Down Expand Up @@ -529,7 +554,7 @@ async def run_agent_session(
tool_count=tool_count,
response_length=len(response_text),
)
return "complete", response_text
return "complete", response_text, {}

debug_success(
"session",
Expand All @@ -538,17 +563,36 @@ async def run_agent_session(
tool_count=tool_count,
response_length=len(response_text),
)
return "continue", response_text
return "continue", response_text, {}

except Exception as e:
# Detect specific error types for better retry handling
is_concurrency = is_tool_concurrency_error(e)
error_type = "tool_concurrency" if is_concurrency else "other"

debug_error(
"session",
f"Session error: {e}",
exception_type=type(e).__name__,
error_category=error_type,
message_count=message_count,
tool_count=tool_count,
)
print(f"Error during agent session: {e}")

# Log concurrency errors prominently
if is_concurrency:
print("\n⚠️ Tool concurrency limit reached (400 error)")
print(" Claude API limits concurrent tool use in a single request")
print(f" Error: {str(e)[:200]}\n")
else:
print(f"Error during agent session: {e}")

if task_logger:
task_logger.log_error(f"Session error: {e}", phase)
return "error", str(e)

error_info = {
"type": error_type,
"message": str(e),
"exception_type": type(e).__name__,
}
return "error", str(e), error_info
17 changes: 13 additions & 4 deletions apps/backend/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,19 +490,28 @@ def create_client(
(see security.py for ALLOWED_COMMANDS)
4. Tool filtering - Each agent type only sees relevant tools (prevents misuse)
"""
# Get OAuth token - Claude CLI handles token lifecycle internally
oauth_token = require_auth_token()
# Collect env vars to pass to SDK (ANTHROPIC_BASE_URL, CLAUDE_CONFIG_DIR, etc.)
sdk_env = get_sdk_env_vars()

# Get the config dir for profile-specific credential lookup
# CLAUDE_CONFIG_DIR enables per-profile Keychain entries with SHA256-hashed service names
config_dir = sdk_env.get("CLAUDE_CONFIG_DIR")

# Get OAuth token - uses profile-specific Keychain lookup when config_dir is set
# This correctly reads from "Claude Code-credentials-{hash}" for non-default profiles
oauth_token = require_auth_token(config_dir)

# Validate token is not encrypted before passing to SDK
# Encrypted tokens (enc:...) should have been decrypted by require_auth_token()
# If we still have an encrypted token here, it means decryption failed or was skipped
validate_token_not_encrypted(oauth_token)

# Ensure SDK can access it via its expected env var
# This is required because the SDK doesn't know about per-profile Keychain naming
os.environ["CLAUDE_CODE_OAUTH_TOKEN"] = oauth_token

# Collect env vars to pass to SDK (ANTHROPIC_BASE_URL, etc.)
sdk_env = get_sdk_env_vars()
if config_dir:
logger.info(f"Using CLAUDE_CONFIG_DIR for profile: {config_dir}")

# Debug: Log git-bash path detection on Windows
if "CLAUDE_CODE_GIT_BASH_PATH" in sdk_env:
Expand Down
20 changes: 13 additions & 7 deletions apps/backend/core/simple_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"""

import logging
import os
from pathlib import Path

from agents.tools_pkg import get_agent_config, get_default_thinking_level
Expand Down Expand Up @@ -72,21 +73,26 @@ def create_simple_client(
Raises:
ValueError: If agent_type is not found in AGENT_CONFIGS
"""
# Get authentication
oauth_token = require_auth_token()
# Get environment variables for SDK (including CLAUDE_CONFIG_DIR if set)
sdk_env = get_sdk_env_vars()

# Get the config dir for profile-specific credential lookup
# CLAUDE_CONFIG_DIR enables per-profile Keychain entries with SHA256-hashed service names
config_dir = sdk_env.get("CLAUDE_CONFIG_DIR")

# Get OAuth token - uses profile-specific Keychain lookup when config_dir is set
# This correctly reads from "Claude Code-credentials-{hash}" for non-default profiles
oauth_token = require_auth_token(config_dir)

# Validate token is not encrypted before passing to SDK
# Encrypted tokens (enc:...) should have been decrypted by require_auth_token()
# If we still have an encrypted token here, it means decryption failed or was skipped
validate_token_not_encrypted(oauth_token)

import os

# Ensure SDK can access it via its expected env var
# This is required because the SDK doesn't know about per-profile Keychain naming
os.environ["CLAUDE_CODE_OAUTH_TOKEN"] = oauth_token

# Get environment variables for SDK
sdk_env = get_sdk_env_vars()

# Get agent configuration (raises ValueError if unknown type)
config = get_agent_config(agent_type)

Expand Down
Loading
Loading