diff --git a/e2e-tests/test_agents_and_settings.py b/e2e-tests/test_agents_and_settings.py index 3f6fc80a..38dc05f4 100644 --- a/e2e-tests/test_agents_and_settings.py +++ b/e2e-tests/test_agents_and_settings.py @@ -1,6 +1,7 @@ """End-to-end tests for agents and setting sources with real Claude API calls.""" import asyncio +import json import sys import tempfile from pathlib import Path @@ -15,10 +16,33 @@ ) +def generate_large_agents( + num_agents: int = 20, prompt_size_kb: int = 12 +) -> dict[str, AgentDefinition]: + """Generate multiple agents with large prompts for testing. + + Args: + num_agents: Number of agents to generate + prompt_size_kb: Size of each agent's prompt in KB + + Returns: + Dictionary of agent name -> AgentDefinition + """ + agents = {} + for i in range(num_agents): + # Generate a large prompt with some structure + prompt_content = f"You are test agent #{i}. " + ("x" * (prompt_size_kb * 1024)) + agents[f"large-agent-{i}"] = AgentDefinition( + description=f"Large test agent #{i} for stress testing", + prompt=prompt_content, + ) + return agents + + @pytest.mark.e2e @pytest.mark.asyncio async def test_agent_definition(): - """Test that custom agent definitions work.""" + """Test that custom agent definitions work in streaming mode.""" options = ClaudeAgentOptions( agents={ "test-agent": AgentDefinition( @@ -47,6 +71,74 @@ async def test_agent_definition(): break +@pytest.mark.e2e +@pytest.mark.asyncio +async def test_agent_definition_with_query_function(): + """Test that custom agent definitions work with the query() function. + + Both ClaudeSDKClient and query() now use streaming mode internally, + sending agents via the initialize request. + """ + from claude_agent_sdk import query + + options = ClaudeAgentOptions( + agents={ + "test-agent-query": AgentDefinition( + description="A test agent for query function verification", + prompt="You are a test agent.", + ) + }, + max_turns=1, + ) + + # Use query() with string prompt + found_agent = False + async for message in query(prompt="What is 2 + 2?", options=options): + if isinstance(message, SystemMessage) and message.subtype == "init": + agents = message.data.get("agents", []) + assert "test-agent-query" in agents, ( + f"test-agent-query should be available, got: {agents}" + ) + found_agent = True + break + + assert found_agent, "Should have received init message with agents" + + +@pytest.mark.e2e +@pytest.mark.asyncio +async def test_large_agents_with_query_function(): + """Test large agent definitions (260KB+) work with query() function. + + Since we now always use streaming mode internally (matching TypeScript SDK), + large agents are sent via the initialize request through stdin with no + size limits. + """ + from claude_agent_sdk import query + + # Generate 20 agents with 13KB prompts each = ~260KB total + agents = generate_large_agents(num_agents=20, prompt_size_kb=13) + + options = ClaudeAgentOptions( + agents=agents, + max_turns=1, + ) + + # Use query() with string prompt - agents still go via initialize + found_agents = [] + async for message in query(prompt="What is 2 + 2?", options=options): + if isinstance(message, SystemMessage) and message.subtype == "init": + found_agents = message.data.get("agents", []) + break + + # Check all our agents are registered + for agent_name in agents: + assert agent_name in found_agents, ( + f"{agent_name} should be registered. " + f"Found: {found_agents[:5]}... ({len(found_agents)} total)" + ) + + @pytest.mark.e2e @pytest.mark.asyncio async def test_filesystem_agent_loading(): @@ -240,3 +332,62 @@ async def test_setting_sources_project_included(): # On Windows, wait for file handles to be released before cleanup if sys.platform == "win32": await asyncio.sleep(0.5) + + +@pytest.mark.e2e +@pytest.mark.asyncio +async def test_large_agent_definitions_via_initialize(): + """Test that large agent definitions (250KB+) are sent via initialize request. + + This test verifies the fix for the issue where large agent definitions + would previously trigger a temp file workaround with @filepath. Now they + are sent via the initialize control request through stdin, which has no + size limit. + + The test: + 1. Generates 20 agents with ~13KB prompts each (~260KB total) + 2. Creates an SDK client with these agents + 3. Verifies all agents are registered and available + """ + from dataclasses import asdict + + # Generate 20 agents with 13KB prompts each = ~260KB total + agents = generate_large_agents(num_agents=20, prompt_size_kb=13) + + # Calculate total size to verify we're testing the right thing + total_size = sum( + len(json.dumps({k: v for k, v in asdict(agent).items() if v is not None})) + for agent in agents.values() + ) + assert total_size > 250_000, ( + f"Test agents should be >250KB, got {total_size / 1024:.1f}KB" + ) + + options = ClaudeAgentOptions( + agents=agents, + max_turns=1, + ) + + async with ClaudeSDKClient(options=options) as client: + await client.query("List available agents") + + # Check that all agents are available in init message + async for message in client.receive_response(): + if isinstance(message, SystemMessage) and message.subtype == "init": + registered_agents = message.data.get("agents", []) + assert isinstance(registered_agents, list), ( + f"agents should be a list, got: {type(registered_agents)}" + ) + + # Verify all our agents are registered + for agent_name in agents: + assert agent_name in registered_agents, ( + f"{agent_name} should be registered. " + f"Found: {registered_agents[:5]}... ({len(registered_agents)} total)" + ) + + # All agents should be there + assert len(registered_agents) >= len(agents), ( + f"Expected at least {len(agents)} agents, got {len(registered_agents)}" + ) + break diff --git a/src/claude_agent_sdk/_internal/client.py b/src/claude_agent_sdk/_internal/client.py index 52466272..90f535fb 100644 --- a/src/claude_agent_sdk/_internal/client.py +++ b/src/claude_agent_sdk/_internal/client.py @@ -1,7 +1,7 @@ """Internal client implementation.""" from collections.abc import AsyncIterable, AsyncIterator -from dataclasses import replace +from dataclasses import asdict, replace from typing import Any from ..types import ( @@ -89,32 +89,52 @@ async def process_query( if isinstance(config, dict) and config.get("type") == "sdk": sdk_mcp_servers[name] = config["instance"] # type: ignore[typeddict-item] + # Convert agents to dict format for initialize request + agents_dict = None + if configured_options.agents: + agents_dict = { + name: {k: v for k, v in asdict(agent_def).items() if v is not None} + for name, agent_def in configured_options.agents.items() + } + # Create Query to handle control protocol - is_streaming = not isinstance(prompt, str) + # Always use streaming mode internally (matching TypeScript SDK) + # This ensures agents are always sent via initialize request query = Query( transport=chosen_transport, - is_streaming_mode=is_streaming, + is_streaming_mode=True, # Always streaming internally can_use_tool=configured_options.can_use_tool, hooks=self._convert_hooks_to_internal_format(configured_options.hooks) if configured_options.hooks else None, sdk_mcp_servers=sdk_mcp_servers, + agents=agents_dict, ) try: # Start reading messages await query.start() - # Initialize if streaming - if is_streaming: - await query.initialize() + # Always initialize to send agents via stdin (matching TypeScript SDK) + await query.initialize() - # Stream input if it's an AsyncIterable - if isinstance(prompt, AsyncIterable) and query._tg: - # Start streaming in background - # Create a task that will run in the background + # Handle prompt input + if isinstance(prompt, str): + # For string prompts, write user message to stdin after initialize + # (matching TypeScript SDK behavior) + import json + + user_message = { + "type": "user", + "session_id": "", + "message": {"role": "user", "content": prompt}, + "parent_tool_use_id": None, + } + await chosen_transport.write(json.dumps(user_message) + "\n") + await chosen_transport.end_input() + elif isinstance(prompt, AsyncIterable) and query._tg: + # Stream input in background for async iterables query._tg.start_soon(query.stream_input, prompt) - # For string prompts, the prompt is already passed via CLI args # Yield parsed messages async for data in query.receive_messages(): diff --git a/src/claude_agent_sdk/_internal/query.py b/src/claude_agent_sdk/_internal/query.py index c30fc159..729cd962 100644 --- a/src/claude_agent_sdk/_internal/query.py +++ b/src/claude_agent_sdk/_internal/query.py @@ -73,6 +73,7 @@ def __init__( hooks: dict[str, list[dict[str, Any]]] | None = None, sdk_mcp_servers: dict[str, "McpServer"] | None = None, initialize_timeout: float = 60.0, + agents: dict[str, dict[str, Any]] | None = None, ): """Initialize Query with transport and callbacks. @@ -83,6 +84,7 @@ def __init__( hooks: Optional hook configurations sdk_mcp_servers: Optional SDK MCP server instances initialize_timeout: Timeout in seconds for the initialize request + agents: Optional agent definitions to send via initialize """ self._initialize_timeout = initialize_timeout self.transport = transport @@ -90,6 +92,7 @@ def __init__( self.can_use_tool = can_use_tool self.hooks = hooks or {} self.sdk_mcp_servers = sdk_mcp_servers or {} + self._agents = agents # Control protocol state self.pending_control_responses: dict[str, anyio.Event] = {} @@ -144,10 +147,12 @@ async def initialize(self) -> dict[str, Any] | None: hooks_config[event].append(hook_matcher_config) # Send initialize request - request = { + request: dict[str, Any] = { "subtype": "initialize", "hooks": hooks_config if hooks_config else None, } + if self._agents: + request["agents"] = self._agents # Use longer timeout for initialize since MCP servers may take time to start response = await self._send_control_request( diff --git a/src/claude_agent_sdk/_internal/transport/subprocess_cli.py b/src/claude_agent_sdk/_internal/transport/subprocess_cli.py index a4882db1..3797cf9e 100644 --- a/src/claude_agent_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_agent_sdk/_internal/transport/subprocess_cli.py @@ -7,10 +7,8 @@ import re import shutil import sys -import tempfile from collections.abc import AsyncIterable, AsyncIterator from contextlib import suppress -from dataclasses import asdict from pathlib import Path from subprocess import PIPE from typing import Any @@ -31,11 +29,6 @@ _DEFAULT_MAX_BUFFER_SIZE = 1024 * 1024 # 1MB buffer limit MINIMUM_CLAUDE_CODE_VERSION = "2.0.0" -# Platform-specific command line length limits -# Windows cmd.exe has a limit of 8191 characters, use 8000 for safety -# Other platforms have much higher limits -_CMD_LENGTH_LIMIT = 8000 if platform.system() == "Windows" else 100000 - class SubprocessCLITransport(Transport): """Subprocess transport using Claude Code CLI.""" @@ -46,7 +39,9 @@ def __init__( options: ClaudeAgentOptions, ): self._prompt = prompt - self._is_streaming = not isinstance(prompt, str) + # Always use streaming mode internally (matching TypeScript SDK) + # This allows agents and other large configs to be sent via initialize request + self._is_streaming = True self._options = options self._cli_path = ( str(options.cli_path) if options.cli_path is not None else self._find_cli() @@ -64,7 +59,6 @@ def __init__( if options.max_buffer_size is not None else _DEFAULT_MAX_BUFFER_SIZE ) - self._temp_files: list[str] = [] # Track temporary files for cleanup self._write_lock: anyio.Lock = anyio.Lock() def _find_cli(self) -> str: @@ -276,13 +270,8 @@ def _build_command(self) -> list[str]: if self._options.fork_session: cmd.append("--fork-session") - if self._options.agents: - agents_dict = { - name: {k: v for k, v in asdict(agent_def).items() if v is not None} - for name, agent_def in self._options.agents.items() - } - agents_json = json.dumps(agents_dict) - cmd.extend(["--agents", agents_json]) + # Agents are always sent via initialize request (matching TypeScript SDK) + # No --agents CLI flag needed sources_value = ( ",".join(self._options.setting_sources) @@ -324,45 +313,9 @@ def _build_command(self) -> list[str]: if schema is not None: cmd.extend(["--json-schema", json.dumps(schema)]) - # Add prompt handling based on mode - # IMPORTANT: This must come AFTER all flags because everything after "--" is treated as arguments - if self._is_streaming: - # Streaming mode: use --input-format stream-json - cmd.extend(["--input-format", "stream-json"]) - else: - # String mode: use --print with the prompt - cmd.extend(["--print", "--", str(self._prompt)]) - - # Check if command line is too long (Windows limitation) - cmd_str = " ".join(cmd) - if len(cmd_str) > _CMD_LENGTH_LIMIT and self._options.agents: - # Command is too long - use temp file for agents - # Find the --agents argument and replace its value with @filepath - try: - agents_idx = cmd.index("--agents") - agents_json_value = cmd[agents_idx + 1] - - # Create a temporary file - # ruff: noqa: SIM115 - temp_file = tempfile.NamedTemporaryFile( - mode="w", suffix=".json", delete=False, encoding="utf-8" - ) - temp_file.write(agents_json_value) - temp_file.close() - - # Track for cleanup - self._temp_files.append(temp_file.name) - - # Replace agents JSON with @filepath reference - cmd[agents_idx + 1] = f"@{temp_file.name}" - - logger.info( - f"Command line length ({len(cmd_str)}) exceeds limit ({_CMD_LENGTH_LIMIT}). " - f"Using temp file for --agents: {temp_file.name}" - ) - except (ValueError, IndexError) as e: - # This shouldn't happen, but log it just in case - logger.warning(f"Failed to optimize command line length: {e}") + # Always use streaming mode with stdin (matching TypeScript SDK) + # This allows agents and other large configs to be sent via initialize request + cmd.extend(["--input-format", "stream-json"]) return cmd @@ -421,12 +374,9 @@ async def connect(self) -> None: await self._stderr_task_group.__aenter__() self._stderr_task_group.start_soon(self._handle_stderr) - # Setup stdin for streaming mode - if self._is_streaming and self._process.stdin: + # Setup stdin for streaming (always used now) + if self._process.stdin: self._stdin_stream = TextSendStream(self._process.stdin) - elif not self._is_streaming and self._process.stdin: - # String mode: close stdin immediately - await self._process.stdin.aclose() self._ready = True @@ -476,12 +426,6 @@ async def _handle_stderr(self) -> None: async def close(self) -> None: """Close the transport and clean up resources.""" - # Clean up temporary files first (before early return) - for temp_file in self._temp_files: - with suppress(Exception): - Path(temp_file).unlink(missing_ok=True) - self._temp_files.clear() - if not self._process: self._ready = False return diff --git a/src/claude_agent_sdk/client.py b/src/claude_agent_sdk/client.py index 18ab818d..1d6a2456 100644 --- a/src/claude_agent_sdk/client.py +++ b/src/claude_agent_sdk/client.py @@ -3,7 +3,7 @@ import json import os from collections.abc import AsyncIterable, AsyncIterator -from dataclasses import replace +from dataclasses import asdict, replace from typing import Any from . import Transport @@ -147,6 +147,14 @@ async def _empty_stream() -> AsyncIterator[dict[str, Any]]: ) initialize_timeout = max(initialize_timeout_ms / 1000.0, 60.0) + # Convert agents to dict format for initialize request + agents_dict: dict[str, dict[str, Any]] | None = None + if self.options.agents: + agents_dict = { + name: {k: v for k, v in asdict(agent_def).items() if v is not None} + for name, agent_def in self.options.agents.items() + } + # Create Query to handle control protocol self._query = Query( transport=self._transport, @@ -157,6 +165,7 @@ async def _empty_stream() -> AsyncIterator[dict[str, Any]]: else None, sdk_mcp_servers=sdk_mcp_servers, initialize_timeout=initialize_timeout, + agents=agents_dict, ) # Start reading messages and initialize diff --git a/src/claude_agent_sdk/types.py b/src/claude_agent_sdk/types.py index 9c09345f..3e590b5d 100644 --- a/src/claude_agent_sdk/types.py +++ b/src/claude_agent_sdk/types.py @@ -697,6 +697,7 @@ class SDKControlPermissionRequest(TypedDict): class SDKControlInitializeRequest(TypedDict): subtype: Literal["initialize"] hooks: dict[HookEvent, Any] | None + agents: NotRequired[dict[str, dict[str, Any]]] class SDKControlSetPermissionModeRequest(TypedDict): diff --git a/tests/test_client.py b/tests/test_client.py index 39c32895..b80bb3f2 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -75,9 +75,15 @@ def test_query_with_cwd(self): """Test query with custom working directory.""" async def _test(): - with patch( - "claude_agent_sdk._internal.client.SubprocessCLITransport" - ) as mock_transport_class: + with ( + patch( + "claude_agent_sdk._internal.client.SubprocessCLITransport" + ) as mock_transport_class, + patch( + "claude_agent_sdk._internal.query.Query.initialize", + new_callable=AsyncMock, + ), + ): mock_transport = AsyncMock() mock_transport_class.return_value = mock_transport diff --git a/tests/test_integration.py b/tests/test_integration.py index 1f237dcc..5b434546 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -25,9 +25,15 @@ def test_simple_query_response(self): """Test a simple query with text response.""" async def _test(): - with patch( - "claude_agent_sdk._internal.client.SubprocessCLITransport" - ) as mock_transport_class: + with ( + patch( + "claude_agent_sdk._internal.client.SubprocessCLITransport" + ) as mock_transport_class, + patch( + "claude_agent_sdk._internal.query.Query.initialize", + new_callable=AsyncMock, + ), + ): mock_transport = AsyncMock() mock_transport_class.return_value = mock_transport @@ -83,9 +89,15 @@ def test_query_with_tool_use(self): """Test query that uses tools.""" async def _test(): - with patch( - "claude_agent_sdk._internal.client.SubprocessCLITransport" - ) as mock_transport_class: + with ( + patch( + "claude_agent_sdk._internal.client.SubprocessCLITransport" + ) as mock_transport_class, + patch( + "claude_agent_sdk._internal.query.Query.initialize", + new_callable=AsyncMock, + ), + ): mock_transport = AsyncMock() mock_transport_class.return_value = mock_transport @@ -169,9 +181,15 @@ def test_continuation_option(self): """Test query with continue_conversation option.""" async def _test(): - with patch( - "claude_agent_sdk._internal.client.SubprocessCLITransport" - ) as mock_transport_class: + with ( + patch( + "claude_agent_sdk._internal.client.SubprocessCLITransport" + ) as mock_transport_class, + patch( + "claude_agent_sdk._internal.query.Query.initialize", + new_callable=AsyncMock, + ), + ): mock_transport = AsyncMock() mock_transport_class.return_value = mock_transport @@ -217,9 +235,15 @@ def test_max_budget_usd_option(self): """Test query with max_budget_usd option.""" async def _test(): - with patch( - "claude_agent_sdk._internal.client.SubprocessCLITransport" - ) as mock_transport_class: + with ( + patch( + "claude_agent_sdk._internal.client.SubprocessCLITransport" + ) as mock_transport_class, + patch( + "claude_agent_sdk._internal.query.Query.initialize", + new_callable=AsyncMock, + ), + ): mock_transport = AsyncMock() mock_transport_class.return_value = mock_transport diff --git a/tests/test_transport.py b/tests/test_transport.py index fe9b6b22..65e6ada7 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -44,8 +44,10 @@ def test_build_command_basic(self): assert cmd[0] == "/usr/bin/claude" assert "--output-format" in cmd assert "stream-json" in cmd - assert "--print" in cmd - assert "Hello" in cmd + # Always use streaming mode (matching TypeScript SDK) + assert "--input-format" in cmd + assert "--print" not in cmd # Never use --print anymore + # Prompt is sent via stdin, not CLI args assert "--system-prompt" in cmd assert cmd[cmd.index("--system-prompt") + 1] == "" @@ -826,3 +828,87 @@ async def do_write(i: int): await process.wait() anyio.run(_test, backend="trio") + + def test_build_command_agents_always_via_initialize(self): + """Test that --agents is NEVER passed via CLI. + + Matching TypeScript SDK behavior, agents are always sent via the + initialize request through stdin, regardless of prompt type. + """ + from claude_agent_sdk.types import AgentDefinition + + agents = { + "test-agent": AgentDefinition( + description="A test agent", + prompt="You are a test agent", + ) + } + + # Test with string prompt + transport = SubprocessCLITransport( + prompt="Hello", + options=make_options(agents=agents), + ) + cmd = transport._build_command() + assert "--agents" not in cmd + assert "--input-format" in cmd + assert "stream-json" in cmd + + # Test with async iterable prompt + async def fake_stream(): + yield {"type": "user", "message": {"role": "user", "content": "test"}} + + transport2 = SubprocessCLITransport( + prompt=fake_stream(), + options=make_options(agents=agents), + ) + cmd2 = transport2._build_command() + assert "--agents" not in cmd2 + assert "--input-format" in cmd2 + assert "stream-json" in cmd2 + + def test_build_command_always_uses_streaming(self): + """Test that streaming mode is always used, even for string prompts. + + Matching TypeScript SDK behavior, we always use --input-format stream-json + so that agents and other large configs can be sent via initialize request. + """ + # String prompt should still use streaming + transport = SubprocessCLITransport( + prompt="Hello", + options=make_options(), + ) + cmd = transport._build_command() + assert "--input-format" in cmd + assert "stream-json" in cmd + assert "--print" not in cmd + + def test_build_command_large_agents_work(self): + """Test that large agent definitions work without size limits. + + Since agents are sent via initialize request through stdin, + there are no ARG_MAX or command line length limits. + """ + from claude_agent_sdk.types import AgentDefinition + + # Create a large agent definition (50KB prompt) + large_prompt = "x" * 50000 + agents = { + "large-agent": AgentDefinition( + description="A large agent", + prompt=large_prompt, + ) + } + + transport = SubprocessCLITransport( + prompt="Hello", + options=make_options(agents=agents), + ) + + cmd = transport._build_command() + + # --agents should not be in command (sent via initialize) + assert "--agents" not in cmd + # No @filepath references should exist + cmd_str = " ".join(cmd) + assert "@" not in cmd_str