Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions apps/backend/agents/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,10 @@
# Configuration constants
AUTO_CONTINUE_DELAY_SECONDS = 3
HUMAN_INTERVENTION_FILE = "PAUSE"

# Retry configuration for 400 tool concurrency errors
MAX_CONCURRENCY_RETRIES = 5 # Maximum number of retries for tool concurrency errors
INITIAL_RETRY_DELAY_SECONDS = (
2 # Initial retry delay (doubles each retry: 2s, 4s, 8s, 16s, 32s)
)
MAX_RETRY_DELAY_SECONDS = 32 # Cap retry delay at 32 seconds
143 changes: 137 additions & 6 deletions apps/backend/agents/coder.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,13 @@
print_status,
)

from .base import AUTO_CONTINUE_DELAY_SECONDS, HUMAN_INTERVENTION_FILE
from .base import (
AUTO_CONTINUE_DELAY_SECONDS,
HUMAN_INTERVENTION_FILE,
INITIAL_RETRY_DELAY_SECONDS,
MAX_CONCURRENCY_RETRIES,
MAX_RETRY_DELAY_SECONDS,
)
from .memory_manager import debug_memory_system_status, get_graphiti_context
from .session import post_session_processing, run_agent_session
from .utils import (
Expand Down Expand Up @@ -215,6 +221,21 @@ def _validate_and_fix_implementation_plan() -> tuple[bool, list[str]]:

# Main loop
iteration = 0
consecutive_concurrency_errors = 0 # Track consecutive 400 tool concurrency errors
current_retry_delay = INITIAL_RETRY_DELAY_SECONDS # Exponential backoff delay
concurrency_error_context: str | None = (
None # Context to pass to agent after concurrency error
)

def _reset_concurrency_state() -> None:
"""Reset concurrency error tracking state after a successful session or non-concurrency error."""
nonlocal \
consecutive_concurrency_errors, \
current_retry_delay, \
concurrency_error_context
consecutive_concurrency_errors = 0
current_retry_delay = INITIAL_RETRY_DELAY_SECONDS
concurrency_error_context = None
Comment on lines +230 to +238
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The _reset_concurrency_state function fails to reset planning_retry_context, which can cause stale error context to be used in subsequent planning attempts after a specific error sequence.
Severity: MEDIUM

Suggested Fix

Add planning_retry_context to the nonlocal statement within the _reset_concurrency_state function and set it to None along with the other state variables being reset.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: apps/backend/agents/coder.py#L230-L238

Potential issue: The function `_reset_concurrency_state` is designed to clear
error-related state but fails to reset the `planning_retry_context` variable. This can
lead to a bug under a specific sequence of events: if a concurrency error occurs during
the planning phase, `planning_retry_context` is set. If a subsequent planning attempt
then fails with a different, non-concurrency error, `_reset_concurrency_state` is called
but does not clear `planning_retry_context`. As a result, the next planning attempt will
incorrectly use the stale error context from the initial concurrency error, potentially
misleading the agent with outdated guidance.

Did we get this right? 👍 / 👎 to inform future reviews.


while True:
iteration += 1
Expand Down Expand Up @@ -405,6 +426,14 @@ def _validate_and_fix_implementation_plan() -> tuple[bool, list[str]]:
prompt += "\n\n" + graphiti_context
print_status("Graphiti memory context loaded", "success")

# Add concurrency error context if recovering from 400 error
if concurrency_error_context:
prompt += "\n\n" + concurrency_error_context
print_status(
f"Added tool concurrency error context (retry {consecutive_concurrency_errors}/{MAX_CONCURRENCY_RETRIES})",
"warning",
)

# Show what we're working on
print(f"Working on: {highlight(subtask_id)}")
print(f"Description: {next_subtask.get('description', 'No description')}")
Expand All @@ -419,7 +448,7 @@ def _validate_and_fix_implementation_plan() -> tuple[bool, list[str]]:

# Run session with async context manager
async with client:
status, response = await run_agent_session(
status, response, error_info = await run_agent_session(
client, prompt, spec_dir, verbose, phase=current_log_phase
)

Expand Down Expand Up @@ -512,6 +541,9 @@ def _validate_and_fix_implementation_plan() -> tuple[bool, list[str]]:
print_build_complete_banner(spec_dir)
status_manager.update(state=BuildState.COMPLETE)

# Reset error tracking on success
_reset_concurrency_state()

if task_logger:
task_logger.end_phase(
LogPhase.CODING,
Expand All @@ -526,6 +558,9 @@ def _validate_and_fix_implementation_plan() -> tuple[bool, list[str]]:
break

elif status == "continue":
# Reset error tracking on successful session
_reset_concurrency_state()

print(
muted(
f"\nAgent will auto-continue in {AUTO_CONTINUE_DELAY_SECONDS}s..."
Expand Down Expand Up @@ -556,10 +591,106 @@ def _validate_and_fix_implementation_plan() -> tuple[bool, list[str]]:

elif status == "error":
emit_phase(ExecutionPhase.FAILED, "Session encountered an error")
print_status("Session encountered an error", "error")
print(muted("Will retry with a fresh session..."))
status_manager.update(state=BuildState.ERROR)
await asyncio.sleep(AUTO_CONTINUE_DELAY_SECONDS)

# Check if this is a tool concurrency error (400)
is_concurrency_error = (
error_info and error_info.get("type") == "tool_concurrency"
)

if is_concurrency_error:
consecutive_concurrency_errors += 1

# Check if we've exceeded max retries (allow 5 retries with delays: 2s, 4s, 8s, 16s, 32s)
if consecutive_concurrency_errors > MAX_CONCURRENCY_RETRIES:
print_status(
f"Tool concurrency limit hit {consecutive_concurrency_errors} times consecutively",
"error",
)
print()
print("=" * 70)
print(" CRITICAL: Agent stuck in retry loop")
print("=" * 70)
print()
print(
"The agent is repeatedly hitting Claude API's tool concurrency limit."
)
print(
"This usually means the agent is trying to use too many tools at once."
)
print()
print("Possible solutions:")
print(" 1. The agent needs to reduce tool usage per request")
print(" 2. Break down the current subtask into smaller steps")
print(" 3. Manual intervention may be required")
print()
print(f"Error: {error_info.get('message', 'Unknown error')[:200]}")
print()

# Mark current subtask as stuck if we have one
if subtask_id:
recovery_manager.mark_subtask_stuck(
subtask_id,
f"Tool concurrency errors after {consecutive_concurrency_errors} retries",
)
print_status(f"Subtask {subtask_id} marked as STUCK", "error")

status_manager.update(state=BuildState.ERROR)
break # Exit the loop

# Exponential backoff: 2s, 4s, 8s, 16s, 32s
print_status(
f"Tool concurrency error (retry {consecutive_concurrency_errors}/{MAX_CONCURRENCY_RETRIES})",
"warning",
)
print(
muted(
f"Waiting {current_retry_delay}s before retry (exponential backoff)..."
)
)
print()

# Set context for next retry so agent knows to adjust behavior
error_context_message = (
"## CRITICAL: TOOL CONCURRENCY ERROR\n\n"
f"Your previous session hit Claude API's tool concurrency limit (HTTP 400).\n"
f"This is retry {consecutive_concurrency_errors}/{MAX_CONCURRENCY_RETRIES}.\n\n"
"**IMPORTANT: You MUST adjust your approach:**\n"
"1. Use ONE tool at a time - do NOT call multiple tools in parallel\n"
"2. Wait for each tool result before calling the next tool\n"
"3. Avoid starting with `pwd` or multiple Read calls at once\n"
"4. If you need to read multiple files, read them one by one\n"
"5. Take a more incremental, step-by-step approach\n\n"
"Start by focusing on ONE specific action for this subtask."
)
Comment on lines +653 to +664
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This large, multi-line string for the agent's error prompt is defined directly within the main agent loop. For better maintainability and separation of concerns, it's a good practice to extract prompt templates from the core logic.

Consider moving this template to a dedicated prompts module (e.g., prompts.py) and generating it with a helper function. This would make the core agent logic cleaner and centralize prompt management.

For example, in a prompts module:

def generate_concurrency_error_prompt(retry_count: int, max_retries: int) -> str:
    return f'''## CRITICAL: TOOL CONCURRENCY ERROR

Your previous session hit Claude API's tool concurrency limit (HTTP 400).
This is retry {retry_count}/{max_retries}.

**IMPORTANT: You MUST adjust your approach:**
1. Use ONE tool at a time - do NOT call multiple tools in parallel
2. Wait for each tool result before calling the next tool
3. Avoid starting with `pwd` or multiple Read calls at once
4. If you need to read multiple files, read them one by one
5. Take a more incremental, step-by-step approach

Start by focusing on ONE specific action for this subtask.'''


# If we're in planning phase, reset first_run to True so next iteration
# re-enters the planning branch (fix for issue #1565)
if current_log_phase == LogPhase.PLANNING:
first_run = True
planning_retry_context = error_context_message
print_status(
"Planning session failed - will retry planning", "warning"
)
else:
concurrency_error_context = error_context_message

status_manager.update(state=BuildState.ERROR)
await asyncio.sleep(current_retry_delay)

# Double the retry delay for next time (cap at MAX_RETRY_DELAY_SECONDS)
current_retry_delay = min(
current_retry_delay * 2, MAX_RETRY_DELAY_SECONDS
)

else:
# Other errors - use standard retry logic
print_status("Session encountered an error", "error")
print(muted("Will retry with a fresh session..."))
status_manager.update(state=BuildState.ERROR)
await asyncio.sleep(AUTO_CONTINUE_DELAY_SECONDS)

# Reset concurrency error tracking on non-concurrency errors
_reset_concurrency_state()

# Small delay between sessions
if max_iterations is None or iteration < max_iterations:
Expand Down
2 changes: 1 addition & 1 deletion apps/backend/agents/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ async def run_followup_planner(
try:
# Run single planning session
async with client:
status, response = await run_agent_session(
status, response, error_info = await run_agent_session(
client, prompt, spec_dir, verbose, phase=LogPhase.PLANNING
)

Expand Down
62 changes: 53 additions & 9 deletions apps/backend/agents/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,28 @@
logger = logging.getLogger(__name__)


def is_tool_concurrency_error(error: Exception) -> bool:
"""
Check if an error is a 400 tool concurrency error from Claude API.

Tool concurrency errors occur when too many tools are used simultaneously
in a single API request, hitting Claude's concurrent tool use limit.

Args:
error: The exception to check

Returns:
True if this is a tool concurrency error, False otherwise
"""
error_str = str(error).lower()
# Check for 400 status AND tool concurrency keywords
return "400" in error_str and (
("tool" in error_str and "concurrency" in error_str)
or "too many tools" in error_str
or "concurrent tool" in error_str
)


async def post_session_processing(
spec_dir: Path,
project_dir: Path,
Expand Down Expand Up @@ -317,7 +339,7 @@ async def run_agent_session(
spec_dir: Path,
verbose: bool = False,
phase: LogPhase = LogPhase.CODING,
) -> tuple[str, str]:
) -> tuple[str, str, dict]:
"""
Run a single agent session using Claude Agent SDK.

Expand All @@ -329,10 +351,13 @@ async def run_agent_session(
phase: Current execution phase for logging

Returns:
(status, response_text) where status is:
- "continue" if agent should continue working
- "complete" if all subtasks complete
- "error" if an error occurred
(status, response_text, error_info) where:
- status: "continue", "complete", or "error"
- response_text: Agent's response text
- error_info: Dict with error details (empty if no error):
- "type": "tool_concurrency" or "other"
- "message": Error message string
- "exception_type": Exception class name string
"""
debug_section("session", f"Agent Session - {phase.value}")
debug(
Expand Down Expand Up @@ -529,7 +554,7 @@ async def run_agent_session(
tool_count=tool_count,
response_length=len(response_text),
)
return "complete", response_text
return "complete", response_text, {}

debug_success(
"session",
Expand All @@ -538,17 +563,36 @@ async def run_agent_session(
tool_count=tool_count,
response_length=len(response_text),
)
return "continue", response_text
return "continue", response_text, {}

except Exception as e:
# Detect specific error types for better retry handling
is_concurrency = is_tool_concurrency_error(e)
error_type = "tool_concurrency" if is_concurrency else "other"

debug_error(
"session",
f"Session error: {e}",
exception_type=type(e).__name__,
error_category=error_type,
message_count=message_count,
tool_count=tool_count,
)
print(f"Error during agent session: {e}")

# Log concurrency errors prominently
if is_concurrency:
print("\n⚠️ Tool concurrency limit reached (400 error)")
print(" Claude API limits concurrent tool use in a single request")
print(f" Error: {str(e)[:200]}\n")
else:
print(f"Error during agent session: {e}")

if task_logger:
task_logger.log_error(f"Session error: {e}", phase)
return "error", str(e)

error_info = {
"type": error_type,
"message": str(e),
"exception_type": type(e).__name__,
}
return "error", str(e), error_info
17 changes: 13 additions & 4 deletions apps/backend/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,19 +490,28 @@ def create_client(
(see security.py for ALLOWED_COMMANDS)
4. Tool filtering - Each agent type only sees relevant tools (prevents misuse)
"""
# Get OAuth token - Claude CLI handles token lifecycle internally
oauth_token = require_auth_token()
# Collect env vars to pass to SDK (ANTHROPIC_BASE_URL, CLAUDE_CONFIG_DIR, etc.)
sdk_env = get_sdk_env_vars()

# Get the config dir for profile-specific credential lookup
# CLAUDE_CONFIG_DIR enables per-profile Keychain entries with SHA256-hashed service names
config_dir = sdk_env.get("CLAUDE_CONFIG_DIR")

# Get OAuth token - uses profile-specific Keychain lookup when config_dir is set
# This correctly reads from "Claude Code-credentials-{hash}" for non-default profiles
oauth_token = require_auth_token(config_dir)

# Validate token is not encrypted before passing to SDK
# Encrypted tokens (enc:...) should have been decrypted by require_auth_token()
# If we still have an encrypted token here, it means decryption failed or was skipped
validate_token_not_encrypted(oauth_token)

# Ensure SDK can access it via its expected env var
# This is required because the SDK doesn't know about per-profile Keychain naming
os.environ["CLAUDE_CODE_OAUTH_TOKEN"] = oauth_token

# Collect env vars to pass to SDK (ANTHROPIC_BASE_URL, etc.)
sdk_env = get_sdk_env_vars()
if config_dir:
logger.info(f"Using CLAUDE_CONFIG_DIR for profile: {config_dir}")

# Debug: Log git-bash path detection on Windows
if "CLAUDE_CODE_GIT_BASH_PATH" in sdk_env:
Expand Down
20 changes: 13 additions & 7 deletions apps/backend/core/simple_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"""

import logging
import os
from pathlib import Path

from agents.tools_pkg import get_agent_config, get_default_thinking_level
Expand Down Expand Up @@ -72,21 +73,26 @@ def create_simple_client(
Raises:
ValueError: If agent_type is not found in AGENT_CONFIGS
"""
# Get authentication
oauth_token = require_auth_token()
# Get environment variables for SDK (including CLAUDE_CONFIG_DIR if set)
sdk_env = get_sdk_env_vars()

# Get the config dir for profile-specific credential lookup
# CLAUDE_CONFIG_DIR enables per-profile Keychain entries with SHA256-hashed service names
config_dir = sdk_env.get("CLAUDE_CONFIG_DIR")

# Get OAuth token - uses profile-specific Keychain lookup when config_dir is set
# This correctly reads from "Claude Code-credentials-{hash}" for non-default profiles
oauth_token = require_auth_token(config_dir)

# Validate token is not encrypted before passing to SDK
# Encrypted tokens (enc:...) should have been decrypted by require_auth_token()
# If we still have an encrypted token here, it means decryption failed or was skipped
validate_token_not_encrypted(oauth_token)

import os

# Ensure SDK can access it via its expected env var
# This is required because the SDK doesn't know about per-profile Keychain naming
os.environ["CLAUDE_CODE_OAUTH_TOKEN"] = oauth_token

# Get environment variables for SDK
sdk_env = get_sdk_env_vars()

# Get agent configuration (raises ValueError if unknown type)
config = get_agent_config(agent_type)

Expand Down
Loading
Loading