From 48c8cc746022ee463082203ae994d4510ddef057 Mon Sep 17 00:00:00 2001 From: evalstate <1936278+evalstate@users.noreply.github.com> Date: Wed, 31 Dec 2025 13:10:33 +0000 Subject: [PATCH 1/7] change test running for speed --- tests/integration/acp/conftest.py | 298 ++++++++++ tests/integration/acp/test_acp_basic.py | 274 ++++----- .../acp/test_acp_content_blocks.py | 521 ++++++++---------- .../acp/test_acp_filesystem_toolcall.py | 178 +++--- tests/integration/acp/test_acp_permissions.py | 389 ++++++------- .../acp/test_acp_runtime_telemetry.py | 345 +++++------- tests/integration/acp/test_acp_status_line.py | 57 +- tests/integration/acp/test_acp_terminal.py | 257 ++++----- .../acp/test_acp_tool_notifications.py | 294 ++++------ tests/integration/acp/test_client.py | 9 + 10 files changed, 1241 insertions(+), 1381 deletions(-) create mode 100644 tests/integration/acp/conftest.py diff --git a/tests/integration/acp/conftest.py b/tests/integration/acp/conftest.py new file mode 100644 index 000000000..47d74706b --- /dev/null +++ b/tests/integration/acp/conftest.py @@ -0,0 +1,298 @@ +from __future__ import annotations + +import sys +from contextlib import asynccontextmanager +from pathlib import Path +from typing import Any, AsyncIterator + +import pytest_asyncio +from acp.schema import ClientCapabilities, FileSystemCapability, Implementation +from acp.stdio import spawn_agent_process + +TEST_DIR = Path(__file__).parent +if str(TEST_DIR) not in sys.path: + sys.path.append(str(TEST_DIR)) + +from test_client import TestClient # noqa: E402 + +CONFIG_PATH = TEST_DIR / "fastagent.config.yaml" + + +def _fast_agent_cmd( + name: str, + *, + servers: tuple[str, ...] = (), + no_permissions: bool = False, + shell: bool = False, +) -> tuple[str, ...]: + cmd = [ + sys.executable, + "-m", + "fast_agent.cli", + "serve", + "--config-path", + str(CONFIG_PATH), + "--transport", + "acp", + "--model", + "passthrough", + "--name", + name, + ] + if servers: + cmd.extend(["--servers", *servers]) + if no_permissions: + cmd.append("--no-permissions") + if shell: + cmd.append("--shell") + return tuple(cmd) + + +@asynccontextmanager +async def _spawn_initialized_agent( + cmd: tuple[str, ...], + *, + terminal: bool, + fs_read: bool = True, + fs_write: bool = True, + client_name: str = "pytest-client", + client_version: str = "0.0.1", +) -> AsyncIterator[tuple[Any, TestClient, Any]]: + client = TestClient() + async with spawn_agent_process(lambda _: client, *cmd) as (connection, _process): + init_response = await connection.initialize( + protocol_version=1, + client_capabilities=ClientCapabilities( + fs=FileSystemCapability(read_text_file=fs_read, write_text_file=fs_write), + terminal=terminal, + ), + client_info=Implementation(name=client_name, version=client_version), + ) + yield connection, client, init_response + + +@pytest_asyncio.fixture(scope="module", loop_scope="module") +async def acp_basic_process() -> AsyncIterator[tuple[Any, TestClient, Any]]: + cmd = _fast_agent_cmd("fast-agent-acp-test") + async with _spawn_initialized_agent(cmd, terminal=False) as harness: + yield harness + + +@pytest_asyncio.fixture +async def acp_basic( + acp_basic_process: tuple[Any, TestClient, Any], +) -> AsyncIterator[tuple[Any, TestClient, Any]]: + connection, client, init_response = acp_basic_process + client.reset() + yield connection, client, init_response + + +@pytest_asyncio.fixture(scope="module", loop_scope="module") +async def acp_content_process() -> AsyncIterator[tuple[Any, TestClient, Any]]: + cmd = _fast_agent_cmd("fast-agent-acp-content-test") + async with _spawn_initialized_agent(cmd, terminal=False) as harness: + yield harness + + +@pytest_asyncio.fixture +async def acp_content( + acp_content_process: tuple[Any, TestClient, Any], +) -> AsyncIterator[tuple[Any, TestClient, Any]]: + connection, client, init_response = acp_content_process + client.reset() + yield connection, client, init_response + + +@pytest_asyncio.fixture(scope="module", loop_scope="module") +async def acp_filesystem_toolcall_process() -> AsyncIterator[tuple[Any, TestClient, Any]]: + cmd = _fast_agent_cmd( + "fast-agent-acp-filesystem-toolcall-test", + no_permissions=True, + ) + async with _spawn_initialized_agent( + cmd, + terminal=False, + client_name="pytest-filesystem-client", + ) as harness: + yield harness + + +@pytest_asyncio.fixture +async def acp_filesystem_toolcall( + acp_filesystem_toolcall_process: tuple[Any, TestClient, Any], +) -> AsyncIterator[tuple[Any, TestClient, Any]]: + connection, client, init_response = acp_filesystem_toolcall_process + client.reset() + yield connection, client, init_response + + +@pytest_asyncio.fixture(scope="module", loop_scope="module") +async def acp_permissions_process() -> AsyncIterator[tuple[Any, TestClient, Any]]: + cmd = _fast_agent_cmd( + "fast-agent-acp-test", + servers=("progress_test",), + ) + async with _spawn_initialized_agent(cmd, terminal=False) as harness: + yield harness + + +@pytest_asyncio.fixture +async def acp_permissions( + acp_permissions_process: tuple[Any, TestClient, Any], +) -> AsyncIterator[tuple[Any, TestClient, Any]]: + connection, client, init_response = acp_permissions_process + client.reset() + yield connection, client, init_response + + +@pytest_asyncio.fixture(scope="module", loop_scope="module") +async def acp_permissions_no_perms_process() -> AsyncIterator[tuple[Any, TestClient, Any]]: + cmd = _fast_agent_cmd( + "fast-agent-acp-test", + servers=("progress_test",), + no_permissions=True, + ) + async with _spawn_initialized_agent(cmd, terminal=False) as harness: + yield harness + + +@pytest_asyncio.fixture +async def acp_permissions_no_perms( + acp_permissions_no_perms_process: tuple[Any, TestClient, Any], +) -> AsyncIterator[tuple[Any, TestClient, Any]]: + connection, client, init_response = acp_permissions_no_perms_process + client.reset() + yield connection, client, init_response + + +@pytest_asyncio.fixture(scope="module", loop_scope="module") +async def acp_runtime_telemetry_shell_process() -> AsyncIterator[tuple[Any, TestClient, Any]]: + cmd = _fast_agent_cmd( + "fast-agent-acp-runtime-telemetry-test", + no_permissions=True, + shell=True, + ) + async with _spawn_initialized_agent( + cmd, + terminal=True, + client_name="pytest-telemetry-client", + ) as harness: + yield harness + + +@pytest_asyncio.fixture +async def acp_runtime_telemetry_shell( + acp_runtime_telemetry_shell_process: tuple[Any, TestClient, Any], +) -> AsyncIterator[tuple[Any, TestClient, Any]]: + connection, client, init_response = acp_runtime_telemetry_shell_process + client.reset() + yield connection, client, init_response + + +@pytest_asyncio.fixture(scope="module", loop_scope="module") +async def acp_runtime_telemetry_process() -> AsyncIterator[tuple[Any, TestClient, Any]]: + cmd = _fast_agent_cmd( + "fast-agent-acp-runtime-telemetry-test", + no_permissions=True, + ) + async with _spawn_initialized_agent( + cmd, + terminal=False, + client_name="pytest-telemetry-client", + ) as harness: + yield harness + + +@pytest_asyncio.fixture +async def acp_runtime_telemetry( + acp_runtime_telemetry_process: tuple[Any, TestClient, Any], +) -> AsyncIterator[tuple[Any, TestClient, Any]]: + connection, client, init_response = acp_runtime_telemetry_process + client.reset() + yield connection, client, init_response + + +@pytest_asyncio.fixture(scope="module", loop_scope="module") +async def acp_tool_notifications_process() -> AsyncIterator[tuple[Any, TestClient, Any]]: + cmd = _fast_agent_cmd( + "fast-agent-acp-test", + servers=("progress_test",), + no_permissions=True, + ) + async with _spawn_initialized_agent(cmd, terminal=False) as harness: + yield harness + + +@pytest_asyncio.fixture +async def acp_tool_notifications( + acp_tool_notifications_process: tuple[Any, TestClient, Any], +) -> AsyncIterator[tuple[Any, TestClient, Any]]: + connection, client, init_response = acp_tool_notifications_process + client.reset() + yield connection, client, init_response + + +@pytest_asyncio.fixture(scope="module", loop_scope="module") +async def acp_terminal_shell_process() -> AsyncIterator[tuple[Any, TestClient, Any]]: + cmd = _fast_agent_cmd( + "fast-agent-acp-terminal-test", + shell=True, + ) + async with _spawn_initialized_agent( + cmd, + terminal=True, + client_name="pytest-terminal-client", + ) as harness: + yield harness + + +@pytest_asyncio.fixture +async def acp_terminal_shell( + acp_terminal_shell_process: tuple[Any, TestClient, Any], +) -> AsyncIterator[tuple[Any, TestClient, Any]]: + connection, client, init_response = acp_terminal_shell_process + client.reset() + yield connection, client, init_response + + +@pytest_asyncio.fixture(scope="module", loop_scope="module") +async def acp_terminal_no_shell_process() -> AsyncIterator[tuple[Any, TestClient, Any]]: + cmd = _fast_agent_cmd("fast-agent-acp-terminal-test") + async with _spawn_initialized_agent( + cmd, + terminal=True, + client_name="pytest-terminal-client", + ) as harness: + yield harness + + +@pytest_asyncio.fixture +async def acp_terminal_no_shell( + acp_terminal_no_shell_process: tuple[Any, TestClient, Any], +) -> AsyncIterator[tuple[Any, TestClient, Any]]: + connection, client, init_response = acp_terminal_no_shell_process + client.reset() + yield connection, client, init_response + + +@pytest_asyncio.fixture(scope="module", loop_scope="module") +async def acp_terminal_client_unsupported_process() -> AsyncIterator[tuple[Any, TestClient, Any]]: + cmd = _fast_agent_cmd( + "fast-agent-acp-terminal-test", + shell=True, + ) + async with _spawn_initialized_agent( + cmd, + terminal=False, + client_name="pytest-terminal-client", + ) as harness: + yield harness + + +@pytest_asyncio.fixture +async def acp_terminal_client_unsupported( + acp_terminal_client_unsupported_process: tuple[Any, TestClient, Any], +) -> AsyncIterator[tuple[Any, TestClient, Any]]: + connection, client, init_response = acp_terminal_client_unsupported_process + client.reset() + yield connection, client, init_response diff --git a/tests/integration/acp/test_acp_basic.py b/tests/integration/acp/test_acp_basic.py index 5ce9f6086..57ab18820 100644 --- a/tests/integration/acp/test_acp_basic.py +++ b/tests/integration/acp/test_acp_basic.py @@ -7,8 +7,7 @@ import pytest from acp.helpers import text_block -from acp.schema import ClientCapabilities, FileSystemCapability, Implementation, StopReason -from acp.stdio import spawn_agent_process +from acp.schema import StopReason TEST_DIR = Path(__file__).parent if str(TEST_DIR) not in sys.path: @@ -16,84 +15,62 @@ from test_client import TestClient # noqa: E402 -CONFIG_PATH = TEST_DIR / "fastagent.config.yaml" +pytestmark = pytest.mark.asyncio(loop_scope="module") + END_TURN: StopReason = "end_turn" -FAST_AGENT_CMD = ( - sys.executable, - "-m", - "fast_agent.cli", - "serve", - "--config-path", - str(CONFIG_PATH), - "--transport", - "acp", - "--model", - "passthrough", - "--name", - "fast-agent-acp-test", -) @pytest.mark.integration -@pytest.mark.asyncio -async def test_acp_initialize_and_prompt_roundtrip() -> None: +async def test_acp_initialize_and_prompt_roundtrip( + acp_basic: tuple[object, TestClient, object], +) -> None: """Ensure the ACP transport initializes, creates a session, and echoes prompts.""" - client = TestClient() - - async with spawn_agent_process(lambda _: client, *FAST_AGENT_CMD) as (connection, _process): - init_response = await connection.initialize( - protocol_version=1, - client_capabilities=ClientCapabilities( - fs=FileSystemCapability(read_text_file=True, write_text_file=True), - terminal=False, - ), - client_info=Implementation(name="pytest-client", version="0.0.1"), - ) - - assert init_response.protocol_version == 1 - assert init_response.agent_capabilities is not None - assert init_response.agent_info.name == "fast-agent-acp-test" - # AgentCapabilities schema changed upstream; ensure we advertised prompt support. - prompt_caps = getattr(init_response.agent_capabilities, "prompts", None) or getattr( - init_response.agent_capabilities, "prompt_capabilities", None - ) - assert prompt_caps is not None - - session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) - session_id = session_response.session_id - assert session_id - - prompt_text = "echo from ACP integration test" - prompt_response = await connection.prompt( - session_id=session_id, prompt=[text_block(prompt_text)] - ) - assert prompt_response.stop_reason == END_TURN - - await _wait_for_notifications(client) - - # TestClient now stores notifications as dicts with session_id and update keys - # Find the agent_message_chunk notification (may not be the last one due to commands update) - # The update can be either an object with sessionUpdate attr or a dict with sessionUpdate key - def get_session_update_type(update: Any) -> str | None: - if hasattr(update, "sessionUpdate"): - return update.sessionUpdate - if isinstance(update, dict): - return update.get("sessionUpdate") - return None - - message_updates = [ - n - for n in client.notifications - if n["session_id"] == session_id - and get_session_update_type(n["update"]) == "agent_message_chunk" - ] - assert message_updates, ( - f"Expected agent_message_chunk, got: {[get_session_update_type(n['update']) for n in client.notifications]}" - ) - update = message_updates[-1]["update"] - # Passthrough model mirrors user input, so the agent content should match the prompt. - content = update.content if hasattr(update, "content") else update.get("content") - assert getattr(content, "text", None) == prompt_text + connection, client, init_response = acp_basic + + assert init_response.protocol_version == 1 + assert init_response.agent_capabilities is not None + assert init_response.agent_info.name == "fast-agent-acp-test" + # AgentCapabilities schema changed upstream; ensure we advertised prompt support. + prompt_caps = getattr(init_response.agent_capabilities, "prompts", None) or getattr( + init_response.agent_capabilities, "prompt_capabilities", None + ) + assert prompt_caps is not None + + session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) + session_id = session_response.session_id + assert session_id + + prompt_text = "echo from ACP integration test" + prompt_response = await connection.prompt( + session_id=session_id, prompt=[text_block(prompt_text)] + ) + assert prompt_response.stop_reason == END_TURN + + await _wait_for_notifications(client) + + # TestClient now stores notifications as dicts with session_id and update keys + # Find the agent_message_chunk notification (may not be the last one due to commands update) + # The update can be either an object with sessionUpdate attr or a dict with sessionUpdate key + def get_session_update_type(update: Any) -> str | None: + if hasattr(update, "sessionUpdate"): + return update.sessionUpdate + if isinstance(update, dict): + return update.get("sessionUpdate") + return None + + message_updates = [ + n + for n in client.notifications + if n["session_id"] == session_id + and get_session_update_type(n["update"]) == "agent_message_chunk" + ] + assert message_updates, ( + f"Expected agent_message_chunk, got: {[get_session_update_type(n['update']) for n in client.notifications]}" + ) + update = message_updates[-1]["update"] + # Passthrough model mirrors user input, so the agent content should match the prompt. + content = update.content if hasattr(update, "content") else update.get("content") + assert getattr(content, "text", None) == prompt_text async def _wait_for_notifications(client: TestClient, timeout: float = 2.0) -> None: @@ -108,49 +85,39 @@ async def _wait_for_notifications(client: TestClient, timeout: float = 2.0) -> N @pytest.mark.integration -@pytest.mark.asyncio -async def test_acp_session_modes_included_in_new_session() -> None: +async def test_acp_session_modes_included_in_new_session( + acp_basic: tuple[object, TestClient, object], +) -> None: """Test that session/new response includes modes field.""" - client = TestClient() - - async with spawn_agent_process(lambda _: client, *FAST_AGENT_CMD) as (connection, _process): - # Initialize - init_response = await connection.initialize( - protocol_version=1, - client_capabilities=ClientCapabilities( - fs=FileSystemCapability(read_text_file=True, write_text_file=True), - terminal=False, - ), - client_info=Implementation(name="pytest-client", version="0.0.1"), - ) - assert init_response.protocolVersion == 1 - - # Create session - session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) - session_id = session_response.session_id - assert session_id - - # Verify modes are included in the response - assert hasattr(session_response, "modes"), "NewSessionResponse should include modes field" - assert session_response.modes is not None, "Modes should not be None" - - # Verify modes structure - modes = session_response.modes - assert hasattr(modes, "availableModes"), "SessionModeState should have availableModes" - assert hasattr(modes, "currentModeId"), "SessionModeState should have currentModeId" - assert len(modes.availableModes) > 0, "Should have at least one available mode" - assert modes.currentModeId, "Should have a current mode set" - - # Verify the current mode is in available modes - available_mode_ids = [mode.id for mode in modes.availableModes] - assert modes.currentModeId in available_mode_ids, ( - "Current mode should be in available modes" - ) + connection, _client, init_response = acp_basic + + assert init_response.protocolVersion == 1 + + # Create session + session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) + session_id = session_response.session_id + assert session_id + + # Verify modes are included in the response + assert hasattr(session_response, "modes"), "NewSessionResponse should include modes field" + assert session_response.modes is not None, "Modes should not be None" + + # Verify modes structure + modes = session_response.modes + assert hasattr(modes, "availableModes"), "SessionModeState should have availableModes" + assert hasattr(modes, "currentModeId"), "SessionModeState should have currentModeId" + assert len(modes.availableModes) > 0, "Should have at least one available mode" + assert modes.currentModeId, "Should have a current mode set" + + # Verify the current mode is in available modes + available_mode_ids = [mode.id for mode in modes.availableModes] + assert modes.currentModeId in available_mode_ids, "Current mode should be in available modes" @pytest.mark.integration -@pytest.mark.asyncio -async def test_acp_overlapping_prompts_are_refused() -> None: +async def test_acp_overlapping_prompts_are_refused( + acp_basic: tuple[object, TestClient, object], +) -> None: """ Test that overlapping prompt requests for the same session are refused. @@ -158,48 +125,37 @@ async def test_acp_overlapping_prompts_are_refused() -> None: If a second prompt arrives while one is in progress, it should be immediately refused with stopReason="refusal". """ - client = TestClient() - - async with spawn_agent_process(lambda _: client, *FAST_AGENT_CMD) as (connection, _process): - # Initialize - init_response = await connection.initialize( - protocol_version=1, - client_capabilities=ClientCapabilities( - fs=FileSystemCapability(read_text_file=True, write_text_file=True), - terminal=False, - ), - client_info=Implementation(name="pytest-client", version="0.0.1"), - ) - - assert init_response.protocolVersion == 1 - - # Create session - session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) - session_id = session_response.session_id - assert session_id - - # Send two prompts truly concurrently (no sleep between them) - # This ensures they both arrive before either completes - prompt1_task = asyncio.create_task( - connection.prompt(session_id=session_id, prompt=[text_block("first prompt")]) - ) - - # Send immediately without waiting - ensures actual overlap - prompt2_task = asyncio.create_task( - connection.prompt(session_id=session_id, prompt=[text_block("overlapping prompt")]) - ) - - # Wait for both to complete - prompt1_response, prompt2_response = await asyncio.gather(prompt1_task, prompt2_task) - - # One should succeed, one should be refused - # (We don't know which one arrives first due to async scheduling) - responses = [prompt1_response.stopReason, prompt2_response.stopReason] - assert "end_turn" in responses, "One prompt should succeed" - assert "refusal" in responses, "One prompt should be refused" - - # After both complete, a new prompt should succeed - prompt3_response = await connection.prompt( - session_id=session_id, prompt=[text_block("third prompt")] - ) - assert prompt3_response.stopReason == END_TURN + connection, _client, init_response = acp_basic + + assert init_response.protocolVersion == 1 + + # Create session + session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) + session_id = session_response.session_id + assert session_id + + # Send two prompts truly concurrently (no sleep between them) + # This ensures they both arrive before either completes + prompt1_task = asyncio.create_task( + connection.prompt(session_id=session_id, prompt=[text_block("first prompt")]) + ) + + # Send immediately without waiting - ensures actual overlap + prompt2_task = asyncio.create_task( + connection.prompt(session_id=session_id, prompt=[text_block("overlapping prompt")]) + ) + + # Wait for both to complete + prompt1_response, prompt2_response = await asyncio.gather(prompt1_task, prompt2_task) + + # One should succeed, one should be refused + # (We don't know which one arrives first due to async scheduling) + responses = [prompt1_response.stopReason, prompt2_response.stopReason] + assert "end_turn" in responses, "One prompt should succeed" + assert "refusal" in responses, "One prompt should be refused" + + # After both complete, a new prompt should succeed + prompt3_response = await connection.prompt( + session_id=session_id, prompt=[text_block("third prompt")] + ) + assert prompt3_response.stopReason == END_TURN diff --git a/tests/integration/acp/test_acp_content_blocks.py b/tests/integration/acp/test_acp_content_blocks.py index 0f2207ebe..7997f4834 100644 --- a/tests/integration/acp/test_acp_content_blocks.py +++ b/tests/integration/acp/test_acp_content_blocks.py @@ -15,15 +15,11 @@ from acp.helpers import text_block from acp.schema import ( BlobResourceContents, - ClientCapabilities, EmbeddedResourceContentBlock, - FileSystemCapability, ImageContentBlock, - Implementation, StopReason, TextResourceContents, ) -from acp.stdio import spawn_agent_process TEST_DIR = Path(__file__).parent if str(TEST_DIR) not in sys.path: @@ -31,338 +27,275 @@ from test_client import TestClient # noqa: E402 -CONFIG_PATH = TEST_DIR / "fastagent.config.yaml" +pytestmark = pytest.mark.asyncio(loop_scope="module") + END_TURN: StopReason = "end_turn" -FAST_AGENT_CMD = ( - sys.executable, - "-m", - "fast_agent.cli", - "serve", - "--config-path", - str(CONFIG_PATH), - "--transport", - "acp", - "--model", - "passthrough", - "--name", - "fast-agent-acp-content-test", -) @pytest.mark.integration -@pytest.mark.asyncio -async def test_acp_image_content_processing() -> None: +async def test_acp_image_content_processing( + acp_content: tuple[object, TestClient, object], +) -> None: """Test that image content blocks are properly processed.""" - client = TestClient() - - async with spawn_agent_process(lambda _: client, *FAST_AGENT_CMD) as (connection, _process): - # Initialize - init_response = await connection.initialize( - protocol_version=1, - client_capabilities=ClientCapabilities( - fs=FileSystemCapability(read_text_file=True, write_text_file=True), - terminal=False, - ), - client_info=Implementation(name="pytest-client", version="0.0.1"), - ) - - # Check that image is advertised as supported - agent_caps = getattr(init_response, "agent_capabilities", None) or getattr( - init_response, "agentCapabilities", None - ) - assert agent_caps is not None - # Handle both "prompts" and "promptCapabilities" field names - prompt_caps = getattr( - agent_caps, "prompts", None - ) or getattr(agent_caps, "promptCapabilities", None) - assert prompt_caps is not None - # Check if image capability is enabled - assert getattr(prompt_caps, "image", False) is True - - # Create session - session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) - session_id = getattr(session_response, "session_id", None) or getattr( - session_response, "sessionId", None - ) - assert session_id - - # Create a fake image (base64 encoded) - fake_image_data = base64.b64encode(b"fake-image-data").decode("utf-8") - - # Send prompt with text and image - prompt_blocks = [ - text_block("Analyze this image:"), - ImageContentBlock( - type="image", - data=fake_image_data, - mime_type="image/png", - ), - ] - - prompt_response = await connection.prompt(session_id=session_id, prompt=prompt_blocks) - - # Should complete successfully - stop_reason = getattr(prompt_response, "stop_reason", None) or getattr( - prompt_response, "stopReason", None - ) - assert stop_reason == END_TURN - - # Wait for notifications - await _wait_for_notifications(client) - - # Verify we got a response (passthrough model will echo something back) - assert len(client.notifications) > 0 - last_update = client.notifications[-1] - assert last_update["session_id"] == session_id + connection, client, init_response = acp_content + + # Check that image is advertised as supported + agent_caps = getattr(init_response, "agent_capabilities", None) or getattr( + init_response, "agentCapabilities", None + ) + assert agent_caps is not None + # Handle both "prompts" and "promptCapabilities" field names + prompt_caps = getattr(agent_caps, "prompts", None) or getattr( + agent_caps, "promptCapabilities", None + ) + assert prompt_caps is not None + # Check if image capability is enabled + assert getattr(prompt_caps, "image", False) is True + + # Create session + session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) + session_id = getattr(session_response, "session_id", None) or getattr( + session_response, "sessionId", None + ) + assert session_id + + # Create a fake image (base64 encoded) + fake_image_data = base64.b64encode(b"fake-image-data").decode("utf-8") + + # Send prompt with text and image + prompt_blocks = [ + text_block("Analyze this image:"), + ImageContentBlock( + type="image", + data=fake_image_data, + mime_type="image/png", + ), + ] + + prompt_response = await connection.prompt(session_id=session_id, prompt=prompt_blocks) + + # Should complete successfully + stop_reason = getattr(prompt_response, "stop_reason", None) or getattr( + prompt_response, "stopReason", None + ) + assert stop_reason == END_TURN + + # Wait for notifications + await _wait_for_notifications(client) + + # Verify we got a response (passthrough model will echo something back) + assert len(client.notifications) > 0 + last_update = client.notifications[-1] + assert last_update["session_id"] == session_id @pytest.mark.integration -@pytest.mark.asyncio -async def test_acp_embedded_text_resource_processing() -> None: +async def test_acp_embedded_text_resource_processing( + acp_content: tuple[object, TestClient, object], +) -> None: """Test that embedded text resource content blocks are properly processed.""" - client = TestClient() - - async with spawn_agent_process(lambda _: client, *FAST_AGENT_CMD) as (connection, _process): - # Initialize - init_response = await connection.initialize( - protocol_version=1, - client_capabilities=ClientCapabilities( - fs=FileSystemCapability(read_text_file=True, write_text_file=True), - terminal=False, - ), - client_info=Implementation(name="pytest-client", version="0.0.1"), - ) - - # Check that resource is advertised as supported - agent_caps = getattr(init_response, "agent_capabilities", None) or getattr( - init_response, "agentCapabilities", None - ) - assert agent_caps is not None - # Handle both "prompts" and "promptCapabilities" field names - prompt_caps = getattr( - agent_caps, "prompts", None - ) or getattr(agent_caps, "promptCapabilities", None) - assert prompt_caps is not None - # Check if embeddedContext capability is enabled - assert getattr(prompt_caps, "embeddedContext", False) is True - - # Create session - session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) - session_id = getattr(session_response, "session_id", None) or getattr( - session_response, "sessionId", None - ) - assert session_id - - # Send prompt with text resource - prompt_blocks = [ - text_block("Review this code:"), - EmbeddedResourceContentBlock( - type="resource", - resource=TextResourceContents( - uri="file:///example.py", - mime_type="text/x-python", - text="def hello():\n return 'Hello, world!'", - ), + connection, client, init_response = acp_content + + # Check that resource is advertised as supported + agent_caps = getattr(init_response, "agent_capabilities", None) or getattr( + init_response, "agentCapabilities", None + ) + assert agent_caps is not None + # Handle both "prompts" and "promptCapabilities" field names + prompt_caps = getattr(agent_caps, "prompts", None) or getattr( + agent_caps, "promptCapabilities", None + ) + assert prompt_caps is not None + # Check if embeddedContext capability is enabled + assert getattr(prompt_caps, "embeddedContext", False) is True + + # Create session + session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) + session_id = getattr(session_response, "session_id", None) or getattr( + session_response, "sessionId", None + ) + assert session_id + + # Send prompt with text resource + prompt_blocks = [ + text_block("Review this code:"), + EmbeddedResourceContentBlock( + type="resource", + resource=TextResourceContents( + uri="file:///example.py", + mime_type="text/x-python", + text="def hello():\n return 'Hello, world!'", ), - ] + ), + ] - prompt_response = await connection.prompt( - session_id=session_id, - prompt=prompt_blocks, - ) + prompt_response = await connection.prompt( + session_id=session_id, + prompt=prompt_blocks, + ) - # Should complete successfully - stop_reason = getattr(prompt_response, "stop_reason", None) or getattr( - prompt_response, "stopReason", None - ) - assert stop_reason == END_TURN + # Should complete successfully + stop_reason = getattr(prompt_response, "stop_reason", None) or getattr( + prompt_response, "stopReason", None + ) + assert stop_reason == END_TURN - # Wait for notifications - await _wait_for_notifications(client) + # Wait for notifications + await _wait_for_notifications(client) - # Verify we got a response - assert len(client.notifications) > 0 - last_update = client.notifications[-1] - assert last_update["session_id"] == session_id + # Verify we got a response + assert len(client.notifications) > 0 + last_update = client.notifications[-1] + assert last_update["session_id"] == session_id @pytest.mark.integration -@pytest.mark.asyncio -async def test_acp_embedded_blob_resource_processing() -> None: +async def test_acp_embedded_blob_resource_processing( + acp_content: tuple[object, TestClient, object], +) -> None: """Test that embedded blob resource content blocks are properly processed.""" - client = TestClient() - - async with spawn_agent_process(lambda _: client, *FAST_AGENT_CMD) as (connection, _process): - # Initialize and create session - await connection.initialize( - protocol_version=1, - client_capabilities=ClientCapabilities( - fs=FileSystemCapability(read_text_file=True, write_text_file=True), - terminal=False, + connection, client, _init_response = acp_content + + session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) + session_id = getattr(session_response, "session_id", None) or getattr( + session_response, "sessionId", None + ) + assert session_id + + # Create fake binary data + fake_blob_data = base64.b64encode(b"fake-binary-document-data").decode("utf-8") + + # Send prompt with blob resource + prompt_blocks = [ + text_block("Summarize this document:"), + EmbeddedResourceContentBlock( + type="resource", + resource=BlobResourceContents( + uri="file:///document.pdf", + mime_type="application/pdf", + blob=fake_blob_data, ), - client_info=Implementation(name="pytest-client", version="0.0.1"), - ) - - session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) - session_id = getattr(session_response, "session_id", None) or getattr( - session_response, "sessionId", None - ) - assert session_id - - # Create fake binary data - fake_blob_data = base64.b64encode(b"fake-binary-document-data").decode("utf-8") - - # Send prompt with blob resource - prompt_blocks = [ - text_block("Summarize this document:"), - EmbeddedResourceContentBlock( - type="resource", - resource=BlobResourceContents( - uri="file:///document.pdf", - mime_type="application/pdf", - blob=fake_blob_data, - ), - ), - ] + ), + ] - prompt_response = await connection.prompt(session_id=session_id, prompt=prompt_blocks) + prompt_response = await connection.prompt(session_id=session_id, prompt=prompt_blocks) - # Should complete successfully - stop_reason = getattr(prompt_response, "stop_reason", None) or getattr( - prompt_response, "stopReason", None - ) - assert stop_reason == END_TURN + # Should complete successfully + stop_reason = getattr(prompt_response, "stop_reason", None) or getattr( + prompt_response, "stopReason", None + ) + assert stop_reason == END_TURN - # Wait for notifications - await _wait_for_notifications(client) + # Wait for notifications + await _wait_for_notifications(client) - # Verify we got a response - assert len(client.notifications) > 0 + # Verify we got a response + assert len(client.notifications) > 0 @pytest.mark.integration -@pytest.mark.asyncio -async def test_acp_mixed_content_blocks() -> None: +async def test_acp_mixed_content_blocks( + acp_content: tuple[object, TestClient, object], +) -> None: """Test that mixed content blocks (text, image, resource) work together.""" - client = TestClient() - - async with spawn_agent_process(lambda _: client, *FAST_AGENT_CMD) as (connection, _process): - # Initialize and create session - await connection.initialize( - protocol_version=1, - client_capabilities=ClientCapabilities( - fs=FileSystemCapability(read_text_file=True, write_text_file=True), - terminal=False, - ), - client_info=Implementation(name="pytest-client", version="0.0.1"), - ) - - session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) - session_id = getattr(session_response, "session_id", None) or getattr( - session_response, "sessionId", None - ) - - # Create mixed content - image_data = base64.b64encode(b"fake-screenshot").decode("utf-8") - - prompt_blocks = [ - text_block("I need help with this code:"), - EmbeddedResourceContentBlock( - type="resource", - resource=TextResourceContents( - uri="file:///app.py", - mime_type="text/x-python", - text="import sys\nprint(sys.version)", - ), - ), - text_block("And here's a screenshot of the error:"), - ImageContentBlock( - type="image", - data=image_data, - mime_type="image/png", + connection, client, _init_response = acp_content + + session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) + session_id = getattr(session_response, "session_id", None) or getattr( + session_response, "sessionId", None + ) + + # Create mixed content + image_data = base64.b64encode(b"fake-screenshot").decode("utf-8") + + prompt_blocks = [ + text_block("I need help with this code:"), + EmbeddedResourceContentBlock( + type="resource", + resource=TextResourceContents( + uri="file:///app.py", + mime_type="text/x-python", + text="import sys\nprint(sys.version)", ), - text_block("What's wrong?"), - ] + ), + text_block("And here's a screenshot of the error:"), + ImageContentBlock( + type="image", + data=image_data, + mime_type="image/png", + ), + text_block("What's wrong?"), + ] - prompt_response = await connection.prompt(session_id=session_id, prompt=prompt_blocks) + prompt_response = await connection.prompt(session_id=session_id, prompt=prompt_blocks) - # Should complete successfully - stop_reason = getattr(prompt_response, "stop_reason", None) or getattr( - prompt_response, "stopReason", None - ) - assert stop_reason == END_TURN + # Should complete successfully + stop_reason = getattr(prompt_response, "stop_reason", None) or getattr( + prompt_response, "stopReason", None + ) + assert stop_reason == END_TURN - # Wait for notifications - await _wait_for_notifications(client) + # Wait for notifications + await _wait_for_notifications(client) - # Verify we got a response - assert len(client.notifications) > 0 - last_update = client.notifications[-1] - assert last_update["session_id"] == session_id + # Verify we got a response + assert len(client.notifications) > 0 + last_update = client.notifications[-1] + assert last_update["session_id"] == session_id @pytest.mark.integration -@pytest.mark.asyncio -async def test_acp_resource_only_prompt_not_slash_command() -> None: +async def test_acp_resource_only_prompt_not_slash_command( + acp_content: tuple[object, TestClient, object], +) -> None: """ Test that resource-only prompts with text starting with "/" are not treated as slash commands. This verifies the fix for the issue where resource content (like file contents) that happens to start with "/" was incorrectly being detected as a slash command. """ - client = TestClient() - - async with spawn_agent_process(lambda _: client, *FAST_AGENT_CMD) as (connection, _process): - # Initialize and create session - await connection.initialize( - protocol_version=1, - client_capabilities=ClientCapabilities( - fs=FileSystemCapability(read_text_file=True, write_text_file=True), - terminal=False, + connection, client, _init_response = acp_content + + session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) + session_id = getattr(session_response, "session_id", None) or getattr( + session_response, "sessionId", None + ) + assert session_id + + # Send a resource-only prompt with text starting with "/" + # This should NOT be treated as a slash command + prompt_blocks = [ + EmbeddedResourceContentBlock( + type="resource", + resource=TextResourceContents( + uri="file:///C:/Users/shaun/AppData/Roaming/Zed/settings.json", + mime_type="application/json", + text="//hello, world!", ), - client_info=Implementation(name="pytest-client", version="0.0.1"), - ) - - session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) - session_id = getattr(session_response, "session_id", None) or getattr( - session_response, "sessionId", None - ) - assert session_id - - # Send a resource-only prompt with text starting with "/" - # This should NOT be treated as a slash command - prompt_blocks = [ - EmbeddedResourceContentBlock( - type="resource", - resource=TextResourceContents( - uri="file:///C:/Users/shaun/AppData/Roaming/Zed/settings.json", - mime_type="application/json", - text="//hello, world!", - ), - ), - ] - - prompt_response = await connection.prompt(session_id=session_id, prompt=prompt_blocks) - - # Should complete successfully with END_TURN, not be treated as an unknown slash command - stop_reason = getattr(prompt_response, "stop_reason", None) or getattr( - prompt_response, "stopReason", None - ) - assert stop_reason == END_TURN - - # Wait for notifications - await _wait_for_notifications(client) - - # Verify we got a response from the agent (passthrough model) - # If it was incorrectly treated as a slash command, we'd get "Unknown command" response - assert len(client.notifications) > 0 - last_update = client.notifications[-1] - assert last_update["session_id"] == session_id - - # The response should contain the echoed resource text, not an error about unknown command - # (passthrough model echoes the input) - response_text = str(last_update["update"]) - assert "Unknown command" not in response_text + ), + ] + + prompt_response = await connection.prompt(session_id=session_id, prompt=prompt_blocks) + + # Should complete successfully with END_TURN, not be treated as an unknown slash command + stop_reason = getattr(prompt_response, "stop_reason", None) or getattr( + prompt_response, "stopReason", None + ) + assert stop_reason == END_TURN + + # Wait for notifications + await _wait_for_notifications(client) + + # Verify we got a response from the agent (passthrough model) + # If it was incorrectly treated as a slash command, we'd get "Unknown command" response + assert len(client.notifications) > 0 + last_update = client.notifications[-1] + assert last_update["session_id"] == session_id + + # The response should contain the echoed resource text, not an error about unknown command + # (passthrough model echoes the input) + response_text = str(last_update["update"]) + assert "Unknown command" not in response_text async def _wait_for_notifications(client: TestClient, timeout: float = 2.0) -> None: diff --git a/tests/integration/acp/test_acp_filesystem_toolcall.py b/tests/integration/acp/test_acp_filesystem_toolcall.py index fc34756b1..a7a153767 100644 --- a/tests/integration/acp/test_acp_filesystem_toolcall.py +++ b/tests/integration/acp/test_acp_filesystem_toolcall.py @@ -8,7 +8,7 @@ import pytest from acp.helpers import text_block -from acp.schema import ClientCapabilities, FileSystemCapability, Implementation, StopReason +from acp.schema import StopReason TEST_DIR = Path(__file__).parent if str(TEST_DIR) not in sys.path: @@ -16,7 +16,8 @@ from test_client import TestClient # noqa: E402 -CONFIG_PATH = TEST_DIR / "fastagent.config.yaml" +pytestmark = pytest.mark.asyncio(loop_scope="module") + END_TURN: StopReason = "end_turn" @@ -28,27 +29,6 @@ def _get_stop_reason(response: object) -> str | None: return getattr(response, "stop_reason", None) or getattr(response, "stopReason", None) -def get_fast_agent_cmd() -> tuple: - """Build the fast-agent command with appropriate flags.""" - cmd = [ - sys.executable, - "-m", - "fast_agent.cli", - "serve", - "--config-path", - str(CONFIG_PATH), - "--transport", - "acp", - "--model", - "passthrough", # Use passthrough model for deterministic testing - "--name", - "fast-agent-acp-filesystem-toolcall-test", - # Disable permission checks - these tests focus on filesystem functionality - "--no-permissions", - ] - return tuple(cmd) - - async def _wait_for_notifications(client: TestClient, timeout: float = 2.0) -> None: """Wait for the ACP client to receive at least one sessionUpdate.""" loop = asyncio.get_running_loop() @@ -61,105 +41,75 @@ async def _wait_for_notifications(client: TestClient, timeout: float = 2.0) -> N @pytest.mark.integration -@pytest.mark.asyncio -async def test_acp_filesystem_read_tool_call() -> None: +async def test_acp_filesystem_read_tool_call( + acp_filesystem_toolcall: tuple[object, TestClient, object], +) -> None: """Test that read_text_file tool can be called via passthrough model.""" - from acp.stdio import spawn_agent_process - - client = TestClient() + connection, client, init_response = acp_filesystem_toolcall # Set up a test file in the client test_path = "/test/sample.txt" test_content = "Hello from test file!" client.files[test_path] = test_content - async with spawn_agent_process(lambda _: client, *get_fast_agent_cmd()) as ( - connection, - _process, - ): - # Initialize with filesystem support enabled - init_response = await connection.initialize( - protocol_version=1, - client_capabilities=ClientCapabilities( - fs=FileSystemCapability(read_text_file=True, write_text_file=True), - terminal=False, - ), - client_info=Implementation(name="pytest-filesystem-client", version="0.0.1"), - ) - - assert getattr(init_response, "protocol_version", None) == 1 or getattr( - init_response, "protocolVersion", None - ) == 1 - assert ( - getattr(init_response, "agent_capabilities", None) - or getattr(init_response, "agentCapabilities", None) - is not None - ) - - # Create session - session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) - session_id = _get_session_id(session_response) - assert session_id - - # Use passthrough model's ***CALL_TOOL directive to invoke read_text_file - prompt_text = f'***CALL_TOOL read_text_file {{"path": "{test_path}"}}' - prompt_response = await connection.prompt(session_id=session_id, prompt=[text_block(prompt_text)]) - - # Should complete successfully - assert _get_stop_reason(prompt_response) == END_TURN - - # Wait for notifications - await _wait_for_notifications(client) - - # Verify we got notifications - assert len(client.notifications) > 0 - - # Verify the file content appears in the notifications - # This confirms the read_text_file tool was called and returned content - notification_text = str(client.notifications) - assert test_content in notification_text or test_path in notification_text + assert getattr(init_response, "protocol_version", None) == 1 or getattr( + init_response, "protocolVersion", None + ) == 1 + assert ( + getattr(init_response, "agent_capabilities", None) + or getattr(init_response, "agentCapabilities", None) + is not None + ) + + # Create session + session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) + session_id = _get_session_id(session_response) + assert session_id + + # Use passthrough model's ***CALL_TOOL directive to invoke read_text_file + prompt_text = f'***CALL_TOOL read_text_file {{"path": "{test_path}"}}' + prompt_response = await connection.prompt(session_id=session_id, prompt=[text_block(prompt_text)]) + + # Should complete successfully + assert _get_stop_reason(prompt_response) == END_TURN + + # Wait for notifications + await _wait_for_notifications(client) + + # Verify we got notifications + assert len(client.notifications) > 0 + + # Verify the file content appears in the notifications + # This confirms the read_text_file tool was called and returned content + notification_text = str(client.notifications) + assert test_content in notification_text or test_path in notification_text @pytest.mark.integration -@pytest.mark.asyncio -async def test_acp_filesystem_write_tool_call() -> None: +async def test_acp_filesystem_write_tool_call( + acp_filesystem_toolcall: tuple[object, TestClient, object], +) -> None: """Test that write_text_file tool can be called via passthrough model.""" - from acp.stdio import spawn_agent_process - - client = TestClient() - - async with spawn_agent_process(lambda _: client, *get_fast_agent_cmd()) as ( - connection, - _process, - ): - # Initialize with filesystem support enabled - await connection.initialize( - protocol_version=1, - client_capabilities=ClientCapabilities( - fs=FileSystemCapability(read_text_file=True, write_text_file=True), - terminal=False, - ), - client_info=Implementation(name="pytest-filesystem-client", version="0.0.1"), - ) - - # Create session - session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) - session_id = _get_session_id(session_response) - assert session_id - - # Use passthrough model's ***CALL_TOOL directive to invoke write_text_file - test_path = "/test/output.txt" - test_content = "Test content from tool call" - prompt_text = f'***CALL_TOOL write_text_file {{"path": "{test_path}", "content": "{test_content}"}}' - - prompt_response = await connection.prompt(session_id=session_id, prompt=[text_block(prompt_text)]) - - # Should complete successfully - assert _get_stop_reason(prompt_response) == END_TURN - - # Wait for notifications - await _wait_for_notifications(client) - - # Verify the file was written - assert test_path in client.files - assert client.files[test_path] == test_content + connection, client, _init_response = acp_filesystem_toolcall + + # Create session + session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) + session_id = _get_session_id(session_response) + assert session_id + + # Use passthrough model's ***CALL_TOOL directive to invoke write_text_file + test_path = "/test/output.txt" + test_content = "Test content from tool call" + prompt_text = f'***CALL_TOOL write_text_file {{"path": "{test_path}", "content": "{test_content}"}}' + + prompt_response = await connection.prompt(session_id=session_id, prompt=[text_block(prompt_text)]) + + # Should complete successfully + assert _get_stop_reason(prompt_response) == END_TURN + + # Wait for notifications + await _wait_for_notifications(client) + + # Verify the file was written + assert test_path in client.files + assert client.files[test_path] == test_content diff --git a/tests/integration/acp/test_acp_permissions.py b/tests/integration/acp/test_acp_permissions.py index ff84372f1..43de0fe6a 100644 --- a/tests/integration/acp/test_acp_permissions.py +++ b/tests/integration/acp/test_acp_permissions.py @@ -14,8 +14,7 @@ import pytest from acp.helpers import text_block -from acp.schema import ClientCapabilities, FileSystemCapability, Implementation, StopReason -from acp.stdio import spawn_agent_process +from acp.schema import StopReason from fast_agent.mcp.common import create_namespaced_name @@ -25,7 +24,8 @@ from test_client import TestClient # noqa: E402 -CONFIG_PATH = TEST_DIR / "fastagent.config.yaml" +pytestmark = pytest.mark.asyncio(loop_scope="module") + END_TURN: StopReason = "end_turn" @@ -39,29 +39,6 @@ def _get_stop_reason(response: object) -> str | None: return getattr(response, "stop_reason", None) or getattr(response, "stopReason", None) -def _get_fast_agent_cmd(cwd: str | None = None, no_permissions: bool = False) -> tuple: - """Build the fast-agent command with optional flags.""" - cmd = [ - sys.executable, - "-m", - "fast_agent.cli", - "serve", - "--config-path", - str(CONFIG_PATH), - "--transport", - "acp", - "--servers", - "progress_test", - "--model", - "passthrough", - "--name", - "fast-agent-acp-test", - ] - if no_permissions: - cmd.append("--no-permissions") - return tuple(cmd) - - async def _wait_for_notifications(client: TestClient, count: int = 1, timeout: float = 2.0) -> None: """Wait for the ACP client to receive specified number of notifications.""" loop = asyncio.get_running_loop() @@ -99,249 +76,157 @@ def _tool_was_denied(client: TestClient) -> bool: @pytest.mark.integration -@pytest.mark.asyncio -async def test_permission_request_sent_when_tool_called() -> None: +async def test_permission_request_sent_when_tool_called( + acp_permissions: tuple[object, TestClient, object], +) -> None: """Test that a permission request is sent when a tool is called.""" - client = TestClient() + connection, client, _init_response = acp_permissions # Queue a rejection so the tool doesn't actually execute client.queue_permission_cancelled() - async with spawn_agent_process(lambda _: client, *_get_fast_agent_cmd()) as (connection, _process): - # Initialize - await connection.initialize( - protocol_version=1, - client_capabilities=ClientCapabilities( - fs=FileSystemCapability(read_text_file=True, write_text_file=True), - terminal=False, - ), - client_info=Implementation(name="pytest-client", version="0.0.1"), - ) - - # Create session - session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) - session_id = _get_session_id(session_response) + # Create session + session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) + session_id = _get_session_id(session_response) - # Send a prompt that will trigger a tool call - tool_name = create_namespaced_name("progress_test", "progress_task") - prompt_text = f'***CALL_TOOL {tool_name} {{"steps": 1}}' - prompt_response = await connection.prompt(session_id=session_id, prompt=[text_block(prompt_text)]) + # Send a prompt that will trigger a tool call + tool_name = create_namespaced_name("progress_test", "progress_task") + prompt_text = f'***CALL_TOOL {tool_name} {{"steps": 1}}' + prompt_response = await connection.prompt(session_id=session_id, prompt=[text_block(prompt_text)]) - # The tool should have been denied (permission cancelled) - assert _get_stop_reason(prompt_response) == END_TURN + # The tool should have been denied (permission cancelled) + assert _get_stop_reason(prompt_response) == END_TURN - # Wait for notifications to be received - await _wait_for_notifications(client, count=2, timeout=3.0) + # Wait for notifications to be received + await _wait_for_notifications(client, count=2, timeout=3.0) - # Tool should not have executed successfully (permission was cancelled) - assert not _tool_executed_successfully(client), "Tool should not have executed when permission cancelled" + # Tool should not have executed successfully (permission was cancelled) + assert not _tool_executed_successfully(client), ( + "Tool should not have executed when permission cancelled" + ) @pytest.mark.integration -@pytest.mark.asyncio -async def test_allow_once_permits_execution_without_persistence() -> None: +async def test_allow_once_permits_execution_without_persistence( + acp_permissions: tuple[object, TestClient, object], +) -> None: """Test that allow_once permits execution but doesn't persist.""" - client = TestClient() + connection, client, _init_response = acp_permissions # Queue allow_once client.queue_permission_selected("allow_once") with tempfile.TemporaryDirectory() as tmpdir: - async with spawn_agent_process(lambda _: client, *_get_fast_agent_cmd()) as (connection, _process): - # Initialize - await connection.initialize( - protocol_version=1, - client_capabilities=ClientCapabilities( - fs=FileSystemCapability(read_text_file=True, write_text_file=True), - terminal=False, - ), - client_info=Implementation(name="pytest-client", version="0.0.1"), - ) - - # Create session with temp dir as cwd - session_response = await connection.new_session(mcp_servers=[], cwd=tmpdir) - session_id = _get_session_id(session_response) - - # Send a prompt that will trigger a tool call - tool_name = create_namespaced_name("progress_test", "progress_task") - prompt_text = f'***CALL_TOOL {tool_name} {{"steps": 1}}' - prompt_response = await connection.prompt( - session_id=session_id, prompt=[text_block(prompt_text)] - ) - - # The tool should have executed successfully - assert _get_stop_reason(prompt_response) == END_TURN - - # Wait for notifications - await _wait_for_notifications(client, count=3, timeout=3.0) - - # Tool should have executed successfully - assert _tool_executed_successfully(client), "Tool should have executed with allow_once" - - # No auths.md file should exist (allow_once doesn't persist) - auths_file = Path(tmpdir) / ".fast-agent" / "auths.md" - assert not auths_file.exists() + # Create session with temp dir as cwd + session_response = await connection.new_session(mcp_servers=[], cwd=tmpdir) + session_id = _get_session_id(session_response) + + # Send a prompt that will trigger a tool call + tool_name = create_namespaced_name("progress_test", "progress_task") + prompt_text = f'***CALL_TOOL {tool_name} {{"steps": 1}}' + prompt_response = await connection.prompt( + session_id=session_id, prompt=[text_block(prompt_text)] + ) + + # The tool should have executed successfully + assert _get_stop_reason(prompt_response) == END_TURN + + # Wait for notifications + await _wait_for_notifications(client, count=3, timeout=3.0) + + # Tool should have executed successfully + assert _tool_executed_successfully(client), "Tool should have executed with allow_once" + + # No auths.md file should exist (allow_once doesn't persist) + auths_file = Path(tmpdir) / ".fast-agent" / "auths.md" + assert not auths_file.exists() @pytest.mark.integration -@pytest.mark.asyncio -async def test_allow_always_persists() -> None: +async def test_allow_always_persists( + acp_permissions: tuple[object, TestClient, object], +) -> None: """Test that allow_always permits execution and persists.""" - client = TestClient() + connection, client, _init_response = acp_permissions # Queue allow_always client.queue_permission_selected("allow_always") with tempfile.TemporaryDirectory() as tmpdir: - async with spawn_agent_process(lambda _: client, *_get_fast_agent_cmd()) as (connection, _process): - # Initialize - await connection.initialize( - protocol_version=1, - client_capabilities=ClientCapabilities( - fs=FileSystemCapability(read_text_file=True, write_text_file=True), - terminal=False, - ), - client_info=Implementation(name="pytest-client", version="0.0.1"), - ) - - # Create session with temp dir as cwd - session_response = await connection.new_session(mcp_servers=[], cwd=tmpdir) - session_id = _get_session_id(session_response) - - # Send a prompt that will trigger a tool call - tool_name = create_namespaced_name("progress_test", "progress_task") - prompt_text = f'***CALL_TOOL {tool_name} {{"steps": 1}}' - prompt_response = await connection.prompt( - session_id=session_id, prompt=[text_block(prompt_text)] - ) - - # The tool should have executed successfully - assert _get_stop_reason(prompt_response) == END_TURN - - # Wait for notifications - await _wait_for_notifications(client, count=3, timeout=3.0) - - # Tool should have executed successfully - assert _tool_executed_successfully(client), "Tool should have executed with allow_always" - - # auths.md file should exist with allow_always - auths_file = Path(tmpdir) / ".fast-agent" / "auths.md" - assert auths_file.exists() - content = auths_file.read_text() - assert "allow_always" in content - assert "progress_task" in content + # Create session with temp dir as cwd + session_response = await connection.new_session(mcp_servers=[], cwd=tmpdir) + session_id = _get_session_id(session_response) + + # Send a prompt that will trigger a tool call + tool_name = create_namespaced_name("progress_test", "progress_task") + prompt_text = f'***CALL_TOOL {tool_name} {{"steps": 1}}' + prompt_response = await connection.prompt( + session_id=session_id, prompt=[text_block(prompt_text)] + ) + + # The tool should have executed successfully + assert _get_stop_reason(prompt_response) == END_TURN + + # Wait for notifications + await _wait_for_notifications(client, count=3, timeout=3.0) + + # Tool should have executed successfully + assert _tool_executed_successfully(client), "Tool should have executed with allow_always" + + # auths.md file should exist with allow_always + auths_file = Path(tmpdir) / ".fast-agent" / "auths.md" + assert auths_file.exists() + content = auths_file.read_text() + assert "allow_always" in content + assert "progress_task" in content @pytest.mark.integration -@pytest.mark.asyncio -async def test_reject_once_blocks_without_persistence() -> None: +async def test_reject_once_blocks_without_persistence( + acp_permissions: tuple[object, TestClient, object], +) -> None: """Test that reject_once blocks execution but doesn't persist.""" - client = TestClient() + connection, client, _init_response = acp_permissions # Queue reject_once client.queue_permission_selected("reject_once") with tempfile.TemporaryDirectory() as tmpdir: - async with spawn_agent_process(lambda _: client, *_get_fast_agent_cmd()) as (connection, _process): - # Initialize - await connection.initialize( - protocol_version=1, - client_capabilities=ClientCapabilities( - fs=FileSystemCapability(read_text_file=True, write_text_file=True), - terminal=False, - ), - client_info=Implementation(name="pytest-client", version="0.0.1"), - ) - - # Create session with temp dir as cwd - session_response = await connection.new_session(mcp_servers=[], cwd=tmpdir) - session_id = _get_session_id(session_response) - - # Send a prompt that will trigger a tool call - tool_name = create_namespaced_name("progress_test", "progress_task") - prompt_text = f'***CALL_TOOL {tool_name} {{"steps": 1}}' - prompt_response = await connection.prompt( - session_id=session_id, prompt=[text_block(prompt_text)] - ) - - # The tool should have been rejected - assert _get_stop_reason(prompt_response) == END_TURN - - # Wait for notifications - await _wait_for_notifications(client, count=2, timeout=3.0) - - # Tool should not have executed successfully - assert not _tool_executed_successfully(client), "Tool should not have executed with reject_once" - - # No auths.md file should exist (reject_once doesn't persist) - auths_file = Path(tmpdir) / ".fast-agent" / "auths.md" - assert not auths_file.exists() + # Create session with temp dir as cwd + session_response = await connection.new_session(mcp_servers=[], cwd=tmpdir) + session_id = _get_session_id(session_response) + + # Send a prompt that will trigger a tool call + tool_name = create_namespaced_name("progress_test", "progress_task") + prompt_text = f'***CALL_TOOL {tool_name} {{"steps": 1}}' + prompt_response = await connection.prompt( + session_id=session_id, prompt=[text_block(prompt_text)] + ) + + # The tool should have been rejected + assert _get_stop_reason(prompt_response) == END_TURN + + # Wait for notifications + await _wait_for_notifications(client, count=2, timeout=3.0) + + # Tool should not have executed successfully + assert not _tool_executed_successfully(client), ( + "Tool should not have executed with reject_once" + ) + + # No auths.md file should exist (reject_once doesn't persist) + auths_file = Path(tmpdir) / ".fast-agent" / "auths.md" + assert not auths_file.exists() @pytest.mark.integration -@pytest.mark.asyncio -async def test_reject_always_blocks_and_persists() -> None: +async def test_reject_always_blocks_and_persists( + acp_permissions: tuple[object, TestClient, object], +) -> None: """Test that reject_always blocks execution and persists.""" - client = TestClient() + connection, client, _init_response = acp_permissions # Queue reject_always client.queue_permission_selected("reject_always") with tempfile.TemporaryDirectory() as tmpdir: - async with spawn_agent_process(lambda _: client, *_get_fast_agent_cmd()) as (connection, _process): - # Initialize - await connection.initialize( - protocol_version=1, - client_capabilities=ClientCapabilities( - fs=FileSystemCapability(read_text_file=True, write_text_file=True), - terminal=False, - ), - client_info=Implementation(name="pytest-client", version="0.0.1"), - ) - - # Create session with temp dir as cwd - session_response = await connection.new_session(mcp_servers=[], cwd=tmpdir) - session_id = _get_session_id(session_response) - - # Send a prompt that will trigger a tool call - tool_name = create_namespaced_name("progress_test", "progress_task") - prompt_text = f'***CALL_TOOL {tool_name} {{"steps": 1}}' - prompt_response = await connection.prompt( - session_id=session_id, prompt=[text_block(prompt_text)] - ) - - # The tool should have been rejected - assert _get_stop_reason(prompt_response) == END_TURN - - # Wait for notifications - await _wait_for_notifications(client, count=2, timeout=3.0) - - # Tool should not have executed successfully - assert not _tool_executed_successfully(client), "Tool should not have executed with reject_always" - - # auths.md file should exist with reject_always - auths_file = Path(tmpdir) / ".fast-agent" / "auths.md" - assert auths_file.exists() - content = auths_file.read_text() - assert "reject_always" in content - assert "progress_task" in content - - -@pytest.mark.integration -@pytest.mark.asyncio -async def test_no_permissions_flag_disables_checks() -> None: - """Test that --no-permissions flag allows all tool executions.""" - client = TestClient() - # Don't queue any permission response - should not be needed - - async with spawn_agent_process(lambda _: client, *_get_fast_agent_cmd(no_permissions=True)) as (connection, _process): - # Initialize - await connection.initialize( - protocol_version=1, - client_capabilities=ClientCapabilities( - fs=FileSystemCapability(read_text_file=True, write_text_file=True), - terminal=False, - ), - client_info=Implementation(name="pytest-client", version="0.0.1"), - ) - - # Create session - session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) + # Create session with temp dir as cwd + session_response = await connection.new_session(mcp_servers=[], cwd=tmpdir) session_id = _get_session_id(session_response) # Send a prompt that will trigger a tool call @@ -351,11 +236,49 @@ async def test_no_permissions_flag_disables_checks() -> None: session_id=session_id, prompt=[text_block(prompt_text)] ) - # The tool should have executed without permission request + # The tool should have been rejected assert _get_stop_reason(prompt_response) == END_TURN # Wait for notifications - await _wait_for_notifications(client, count=3, timeout=3.0) + await _wait_for_notifications(client, count=2, timeout=3.0) + + # Tool should not have executed successfully + assert not _tool_executed_successfully(client), ( + "Tool should not have executed with reject_always" + ) + + # auths.md file should exist with reject_always + auths_file = Path(tmpdir) / ".fast-agent" / "auths.md" + assert auths_file.exists() + content = auths_file.read_text() + assert "reject_always" in content + assert "progress_task" in content + + +@pytest.mark.integration +async def test_no_permissions_flag_disables_checks( + acp_permissions_no_perms: tuple[object, TestClient, object], +) -> None: + """Test that --no-permissions flag allows all tool executions.""" + connection, client, _init_response = acp_permissions_no_perms + # Don't queue any permission response - should not be needed + + # Create session + session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) + session_id = _get_session_id(session_response) + + # Send a prompt that will trigger a tool call + tool_name = create_namespaced_name("progress_test", "progress_task") + prompt_text = f'***CALL_TOOL {tool_name} {{"steps": 1}}' + prompt_response = await connection.prompt( + session_id=session_id, prompt=[text_block(prompt_text)] + ) + + # The tool should have executed without permission request + assert _get_stop_reason(prompt_response) == END_TURN + + # Wait for notifications + await _wait_for_notifications(client, count=3, timeout=3.0) - # Tool should have executed successfully without needing permission - assert _tool_executed_successfully(client), "Tool should have executed with --no-permissions flag" + # Tool should have executed successfully without needing permission + assert _tool_executed_successfully(client), "Tool should have executed with --no-permissions flag" diff --git a/tests/integration/acp/test_acp_runtime_telemetry.py b/tests/integration/acp/test_acp_runtime_telemetry.py index da9f04338..13f4c0154 100644 --- a/tests/integration/acp/test_acp_runtime_telemetry.py +++ b/tests/integration/acp/test_acp_runtime_telemetry.py @@ -14,8 +14,7 @@ import pytest from acp.helpers import text_block -from acp.schema import ClientCapabilities, FileSystemCapability, Implementation, StopReason -from acp.stdio import spawn_agent_process +from acp.schema import StopReason TEST_DIR = Path(__file__).parent if str(TEST_DIR) not in sys.path: @@ -23,7 +22,8 @@ from test_client import TestClient # noqa: E402 -CONFIG_PATH = TEST_DIR / "fastagent.config.yaml" +pytestmark = pytest.mark.asyncio(loop_scope="module") + END_TURN: StopReason = "end_turn" @@ -45,29 +45,6 @@ def _get_session_update_type(update: Any) -> str | None: return None -def get_fast_agent_cmd(with_shell: bool = True) -> tuple: - """Build the fast-agent command with appropriate flags.""" - cmd = [ - sys.executable, - "-m", - "fast_agent.cli", - "serve", - "--config-path", - str(CONFIG_PATH), - "--transport", - "acp", - "--model", - "passthrough", - "--name", - "fast-agent-acp-runtime-telemetry-test", - # Disable permission checks - these tests focus on telemetry functionality - "--no-permissions", - ] - if with_shell: - cmd.append("--shell") - return tuple(cmd) - - async def _wait_for_notifications(client: TestClient, count: int = 1, timeout: float = 3.0) -> None: """Wait for the ACP client to receive specified number of notifications.""" loop = asyncio.get_running_loop() @@ -80,196 +57,156 @@ async def _wait_for_notifications(client: TestClient, count: int = 1, timeout: f @pytest.mark.integration -@pytest.mark.asyncio -async def test_acp_terminal_runtime_telemetry() -> None: +async def test_acp_terminal_runtime_telemetry( + acp_runtime_telemetry_shell: tuple[object, TestClient, object], +) -> None: """Test that terminal execute operations trigger tool call notifications.""" - client = TestClient() - - async with spawn_agent_process(lambda _: client, *get_fast_agent_cmd(with_shell=True)) as ( - connection, - _process, - ): - # Initialize with terminal support enabled - init_response = await connection.initialize( - protocol_version=1, - client_capabilities=ClientCapabilities( - fs=FileSystemCapability(read_text_file=True, write_text_file=True), - terminal=True, - ), - client_info=Implementation(name="pytest-telemetry-client", version="0.0.1"), - ) - assert getattr(init_response, "protocol_version", None) == 1 or getattr( - init_response, "protocolVersion", None - ) == 1 - - # Create session - session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) - session_id = _get_session_id(session_response) - assert session_id - - # Call the execute tool via passthrough model - prompt_text = '***CALL_TOOL execute {"command": "echo test"}' - prompt_response = await connection.prompt(session_id=session_id, prompt=[text_block(prompt_text)]) - assert _get_stop_reason(prompt_response) == END_TURN - - # Wait for notifications - await _wait_for_notifications(client, count=2, timeout=3.0) - - # Check that we received tool call notifications - tool_notifications = [ - n - for n in client.notifications - if _get_session_update_type(n["update"]) in ["tool_call", "tool_call_update"] - ] - - # Should have at least one tool_call notification - assert len(tool_notifications) > 0, "Expected tool call notifications for execute" - - # First notification should be tool_call (initial) - first_notif = tool_notifications[0]["update"] - assert _get_session_update_type(first_notif) == "tool_call" - assert hasattr(first_notif, "toolCallId") - assert hasattr(first_notif, "title") - assert hasattr(first_notif, "kind") - assert hasattr(first_notif, "status") - - # Verify the title contains "execute" and "acp_terminal" - title = first_notif.title - assert "execute" in title.lower() or "acp_terminal" in title.lower() - - # Status should start as pending - assert first_notif.status == "pending" - - # Last notification should be completed or failed - if len(tool_notifications) > 1: - last_status = tool_notifications[-1]["update"].status - assert last_status in ["completed", "failed"], ( - f"Expected final status, got {last_status}" - ) + connection, client, init_response = acp_runtime_telemetry_shell + + assert getattr(init_response, "protocol_version", None) == 1 or getattr( + init_response, "protocolVersion", None + ) == 1 + + # Create session + session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) + session_id = _get_session_id(session_response) + assert session_id + + # Call the execute tool via passthrough model + prompt_text = '***CALL_TOOL execute {"command": "echo test"}' + prompt_response = await connection.prompt(session_id=session_id, prompt=[text_block(prompt_text)]) + assert _get_stop_reason(prompt_response) == END_TURN + + # Wait for notifications + await _wait_for_notifications(client, count=2, timeout=3.0) + + # Check that we received tool call notifications + tool_notifications = [ + n + for n in client.notifications + if _get_session_update_type(n["update"]) in ["tool_call", "tool_call_update"] + ] + + # Should have at least one tool_call notification + assert len(tool_notifications) > 0, "Expected tool call notifications for execute" + + # First notification should be tool_call (initial) + first_notif = tool_notifications[0]["update"] + assert _get_session_update_type(first_notif) == "tool_call" + assert hasattr(first_notif, "toolCallId") + assert hasattr(first_notif, "title") + assert hasattr(first_notif, "kind") + assert hasattr(first_notif, "status") + + # Verify the title contains "execute" and "acp_terminal" + title = first_notif.title + assert "execute" in title.lower() or "acp_terminal" in title.lower() + + # Status should start as pending + assert first_notif.status == "pending" + + # Last notification should be completed or failed + if len(tool_notifications) > 1: + last_status = tool_notifications[-1]["update"].status + assert last_status in ["completed", "failed"], f"Expected final status, got {last_status}" @pytest.mark.integration -@pytest.mark.asyncio -async def test_acp_filesystem_read_runtime_telemetry() -> None: +async def test_acp_filesystem_read_runtime_telemetry( + acp_runtime_telemetry: tuple[object, TestClient, object], +) -> None: """Test that read_text_file operations trigger tool call notifications.""" - client = TestClient() + connection, client, _init_response = acp_runtime_telemetry # Set up a test file in the client test_path = "/test/sample.txt" test_content = "Hello from test file!" client.files[test_path] = test_content - async with spawn_agent_process(lambda _: client, *get_fast_agent_cmd(with_shell=False)) as ( - connection, - _process, - ): - # Initialize with filesystem support enabled - await connection.initialize( - protocol_version=1, - client_capabilities=ClientCapabilities( - fs=FileSystemCapability(read_text_file=True, write_text_file=True), - terminal=False, - ), - client_info=Implementation(name="pytest-telemetry-client", version="0.0.1"), - ) - - # Create session - session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) - session_id = _get_session_id(session_response) - - # Call the read_text_file tool via passthrough model - prompt_text = f'***CALL_TOOL read_text_file {{"path": "{test_path}"}}' - prompt_response = await connection.prompt(session_id=session_id, prompt=[text_block(prompt_text)]) - assert _get_stop_reason(prompt_response) == END_TURN - - # Wait for notifications - await _wait_for_notifications(client, count=2, timeout=3.0) - - # Check that we received tool call notifications - tool_notifications = [ - n - for n in client.notifications - if _get_session_update_type(n["update"]) in ["tool_call", "tool_call_update"] - ] - - # Should have at least one tool_call notification - assert len(tool_notifications) > 0, "Expected tool call notifications for read_text_file" - - # First notification should be tool_call (initial) - first_notif = tool_notifications[0]["update"] - assert _get_session_update_type(first_notif) == "tool_call" - assert hasattr(first_notif, "toolCallId") - assert hasattr(first_notif, "title") - - # Verify the title contains "read_text_file" and "acp_filesystem" - title = first_notif.title - assert "read_text_file" in title.lower() or "acp_filesystem" in title.lower() - - # Last notification should be completed - if len(tool_notifications) > 1: - last_status = tool_notifications[-1]["update"].status - assert last_status in ["completed", "failed"] + # Create session + session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) + session_id = _get_session_id(session_response) + + # Call the read_text_file tool via passthrough model + prompt_text = f'***CALL_TOOL read_text_file {{"path": "{test_path}"}}' + prompt_response = await connection.prompt(session_id=session_id, prompt=[text_block(prompt_text)]) + assert _get_stop_reason(prompt_response) == END_TURN + + # Wait for notifications + await _wait_for_notifications(client, count=2, timeout=3.0) + + # Check that we received tool call notifications + tool_notifications = [ + n + for n in client.notifications + if _get_session_update_type(n["update"]) in ["tool_call", "tool_call_update"] + ] + + # Should have at least one tool_call notification + assert len(tool_notifications) > 0, "Expected tool call notifications for read_text_file" + + # First notification should be tool_call (initial) + first_notif = tool_notifications[0]["update"] + assert _get_session_update_type(first_notif) == "tool_call" + assert hasattr(first_notif, "toolCallId") + assert hasattr(first_notif, "title") + + # Verify the title contains "read_text_file" and "acp_filesystem" + title = first_notif.title + assert "read_text_file" in title.lower() or "acp_filesystem" in title.lower() + + # Last notification should be completed + if len(tool_notifications) > 1: + last_status = tool_notifications[-1]["update"].status + assert last_status in ["completed", "failed"] @pytest.mark.integration -@pytest.mark.asyncio -async def test_acp_filesystem_write_runtime_telemetry() -> None: +async def test_acp_filesystem_write_runtime_telemetry( + acp_runtime_telemetry: tuple[object, TestClient, object], +) -> None: """Test that write_text_file operations trigger tool call notifications.""" - client = TestClient() - - async with spawn_agent_process(lambda _: client, *get_fast_agent_cmd(with_shell=False)) as ( - connection, - _process, - ): - # Initialize with filesystem support enabled - await connection.initialize( - protocol_version=1, - client_capabilities=ClientCapabilities( - fs=FileSystemCapability(read_text_file=True, write_text_file=True), - terminal=False, - ), - client_info=Implementation(name="pytest-telemetry-client", version="0.0.1"), - ) - - # Create session - session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) - session_id = _get_session_id(session_response) - - # Call the write_text_file tool via passthrough model - test_path = "/test/output.txt" - test_content = "Test content from tool call" - prompt_text = f'***CALL_TOOL write_text_file {{"path": "{test_path}", "content": "{test_content}"}}' - prompt_response = await connection.prompt(session_id=session_id, prompt=[text_block(prompt_text)]) - assert _get_stop_reason(prompt_response) == END_TURN - - # Wait for notifications - await _wait_for_notifications(client, count=2, timeout=3.0) - - # Check that we received tool call notifications - tool_notifications = [ - n - for n in client.notifications - if _get_session_update_type(n["update"]) in ["tool_call", "tool_call_update"] - ] - - # Should have at least one tool_call notification - assert len(tool_notifications) > 0, "Expected tool call notifications for write_text_file" - - # First notification should be tool_call (initial) - first_notif = tool_notifications[0]["update"] - assert _get_session_update_type(first_notif) == "tool_call" - assert hasattr(first_notif, "toolCallId") - assert hasattr(first_notif, "title") - - # Verify the title contains "write_text_file" and "acp_filesystem" - title = first_notif.title - assert "write_text_file" in title.lower() or "acp_filesystem" in title.lower() - - # Verify the file was written - assert test_path in client.files - assert client.files[test_path] == test_content - - # Last notification should be completed - if len(tool_notifications) > 1: - last_status = tool_notifications[-1]["update"].status - assert last_status in ["completed", "failed"] + connection, client, _init_response = acp_runtime_telemetry + + # Create session + session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) + session_id = _get_session_id(session_response) + + # Call the write_text_file tool via passthrough model + test_path = "/test/output.txt" + test_content = "Test content from tool call" + prompt_text = f'***CALL_TOOL write_text_file {{"path": "{test_path}", "content": "{test_content}"}}' + prompt_response = await connection.prompt(session_id=session_id, prompt=[text_block(prompt_text)]) + assert _get_stop_reason(prompt_response) == END_TURN + + # Wait for notifications + await _wait_for_notifications(client, count=2, timeout=3.0) + + # Check that we received tool call notifications + tool_notifications = [ + n + for n in client.notifications + if _get_session_update_type(n["update"]) in ["tool_call", "tool_call_update"] + ] + + # Should have at least one tool_call notification + assert len(tool_notifications) > 0, "Expected tool call notifications for write_text_file" + + # First notification should be tool_call (initial) + first_notif = tool_notifications[0]["update"] + assert _get_session_update_type(first_notif) == "tool_call" + assert hasattr(first_notif, "toolCallId") + assert hasattr(first_notif, "title") + + # Verify the title contains "write_text_file" and "acp_filesystem" + title = first_notif.title + assert "write_text_file" in title.lower() or "acp_filesystem" in title.lower() + + # Verify the file was written + assert test_path in client.files + assert client.files[test_path] == test_content + + # Last notification should be completed + if len(tool_notifications) > 1: + last_status = tool_notifications[-1]["update"].status + assert last_status in ["completed", "failed"] diff --git a/tests/integration/acp/test_acp_status_line.py b/tests/integration/acp/test_acp_status_line.py index 08159f779..4a0b58676 100644 --- a/tests/integration/acp/test_acp_status_line.py +++ b/tests/integration/acp/test_acp_status_line.py @@ -8,8 +8,7 @@ import pytest from acp.helpers import text_block -from acp.schema import ClientCapabilities, FileSystemCapability, Implementation, StopReason -from acp.stdio import spawn_agent_process +from acp.schema import StopReason TEST_DIR = Path(__file__).parent if str(TEST_DIR) not in sys.path: @@ -17,22 +16,9 @@ from test_client import TestClient # noqa: E402 -CONFIG_PATH = TEST_DIR / "fastagent.config.yaml" +pytestmark = pytest.mark.asyncio(loop_scope="module") + END_TURN: StopReason = "end_turn" -FAST_AGENT_CMD = ( - sys.executable, - "-m", - "fast_agent.cli", - "serve", - "--config-path", - str(CONFIG_PATH), - "--transport", - "acp", - "--model", - "passthrough", - "--name", - "fast-agent-acp-test", -) def _extract_status_line(meta: object) -> str | None: @@ -67,29 +53,20 @@ async def _wait_for_status_line(client: TestClient, timeout: float = 2.0) -> str @pytest.mark.integration -@pytest.mark.asyncio -async def test_acp_status_line_meta_is_emitted() -> None: - client = TestClient() - - async with spawn_agent_process(lambda _: client, *FAST_AGENT_CMD) as (connection, _process): - await connection.initialize( - protocol_version=1, - client_capabilities=ClientCapabilities( - fs=FileSystemCapability(read_text_file=True, write_text_file=True), - terminal=False, - ), - client_info=Implementation(name="pytest-client", version="0.0.1"), - ) +async def test_acp_status_line_meta_is_emitted( + acp_basic: tuple[object, TestClient, object], +) -> None: + connection, client, _init_response = acp_basic - session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) - session_id = session_response.session_id - assert session_id + session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) + session_id = session_response.session_id + assert session_id - prompt_text = "status line integration test" - prompt_response = await connection.prompt( - session_id=session_id, prompt=[text_block(prompt_text)] - ) - assert prompt_response.stop_reason == END_TURN + prompt_text = "status line integration test" + prompt_response = await connection.prompt( + session_id=session_id, prompt=[text_block(prompt_text)] + ) + assert prompt_response.stop_reason == END_TURN - status_line = await _wait_for_status_line(client) - assert re.search(r"\d[\d,]* in, \d[\d,]* out", status_line), status_line + status_line = await _wait_for_status_line(client) + assert re.search(r"\d[\d,]* in, \d[\d,]* out", status_line), status_line diff --git a/tests/integration/acp/test_acp_terminal.py b/tests/integration/acp/test_acp_terminal.py index 178250b71..5711ddfdd 100644 --- a/tests/integration/acp/test_acp_terminal.py +++ b/tests/integration/acp/test_acp_terminal.py @@ -8,8 +8,7 @@ import pytest from acp.helpers import text_block -from acp.schema import ClientCapabilities, FileSystemCapability, Implementation, StopReason -from acp.stdio import spawn_agent_process +from acp.schema import StopReason TEST_DIR = Path(__file__).parent if str(TEST_DIR) not in sys.path: @@ -17,7 +16,8 @@ from test_client import TestClient # noqa: E402 -CONFIG_PATH = TEST_DIR / "fastagent.config.yaml" +pytestmark = pytest.mark.asyncio(loop_scope="module") + END_TURN: StopReason = "end_turn" @@ -29,182 +29,109 @@ def _get_stop_reason(response: object) -> str | None: return getattr(response, "stop_reason", None) or getattr(response, "stopReason", None) -def get_fast_agent_cmd(with_shell: bool = True) -> tuple: - """Build the fast-agent command with appropriate flags.""" - cmd = [ - sys.executable, - "-m", - "fast_agent.cli", - "serve", - "--config-path", - str(CONFIG_PATH), - "--transport", - "acp", - "--model", - "passthrough", - "--name", - "fast-agent-acp-terminal-test", - ] - if with_shell: - cmd.append("--shell") - return tuple(cmd) - - @pytest.mark.integration -@pytest.mark.asyncio -async def test_acp_terminal_support_enabled() -> None: +async def test_acp_terminal_support_enabled( + acp_terminal_shell: tuple[object, TestClient, object], +) -> None: """Test that terminal support is properly enabled when client advertises capability.""" - client = TestClient() - - async with spawn_agent_process(lambda _: client, *get_fast_agent_cmd(with_shell=True)) as ( - connection, - _process, - ): - # Initialize with terminal support enabled - init_response = await connection.initialize( - protocol_version=1, - client_capabilities=ClientCapabilities( - fs=FileSystemCapability(read_text_file=True, write_text_file=True), - terminal=True, # Enable terminal support - ), - client_info=Implementation(name="pytest-terminal-client", version="0.0.1"), - ) - - assert getattr(init_response, "protocol_version", None) == 1 or getattr( - init_response, "protocolVersion", None - ) == 1 - assert ( - getattr(init_response, "agent_capabilities", None) - or getattr(init_response, "agentCapabilities", None) - is not None - ) - - # Create session - session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) - session_id = _get_session_id(session_response) - assert session_id - - # Send prompt that should trigger terminal execution - # The passthrough model will echo our input, so we craft a tool call request - prompt_text = 'use the execute tool to run: echo "test terminal"' - prompt_response = await connection.prompt(session_id=session_id, prompt=[text_block(prompt_text)]) - assert _get_stop_reason(prompt_response) == END_TURN - - # Wait for any notifications - await _wait_for_notifications(client) - - # Verify we got notifications - assert len(client.notifications) > 0 + connection, client, init_response = acp_terminal_shell + + assert getattr(init_response, "protocol_version", None) == 1 or getattr( + init_response, "protocolVersion", None + ) == 1 + assert ( + getattr(init_response, "agent_capabilities", None) + or getattr(init_response, "agentCapabilities", None) + is not None + ) + + # Create session + session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) + session_id = _get_session_id(session_response) + assert session_id + + # Send prompt that should trigger terminal execution + # The passthrough model will echo our input, so we craft a tool call request + prompt_text = 'use the execute tool to run: echo "test terminal"' + prompt_response = await connection.prompt(session_id=session_id, prompt=[text_block(prompt_text)]) + assert _get_stop_reason(prompt_response) == END_TURN + + # Wait for any notifications + await _wait_for_notifications(client) + + # Verify we got notifications + assert len(client.notifications) > 0 @pytest.mark.integration -@pytest.mark.asyncio -async def test_acp_terminal_execution() -> None: +async def test_acp_terminal_execution( + acp_terminal_shell: tuple[object, TestClient, object], +) -> None: """Test actual terminal command execution via ACP.""" - client = TestClient() - - async with spawn_agent_process(lambda _: client, *get_fast_agent_cmd(with_shell=True)) as ( - connection, - _process, - ): - # Initialize with terminal support - await connection.initialize( - protocol_version=1, - client_capabilities=ClientCapabilities( - fs=FileSystemCapability(read_text_file=True, write_text_file=True), - terminal=True, - ), - client_info=Implementation(name="pytest-terminal-client", version="0.0.1"), - ) - - # Directly test terminal methods are being called - # Since we're using passthrough model, we can't test actual LLM-driven tool calls - # but we can verify the terminal runtime is set up correctly - - # Create a session first to get a session ID - session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) - session_id = _get_session_id(session_response) - - # The terminals dict should be empty initially - assert len(client.terminals) == 0 - - # Manually test terminal lifecycle (client creates ID) - create_result = await client.create_terminal(command="echo test", session_id=session_id) - terminal_id = create_result.terminal_id - - # Verify terminal was created with client-generated ID - assert terminal_id == "terminal-1" # First terminal - assert terminal_id in client.terminals - assert client.terminals[terminal_id]["command"] == "echo test" - - # Get output - output = await client.terminal_output(session_id=session_id, terminal_id=terminal_id) - assert "Executed: echo test" in output.output - exit_info = await client.wait_for_terminal_exit(session_id=session_id, terminal_id=terminal_id) - assert exit_info.exit_code == 0 - - # Release terminal - await client.release_terminal(session_id=session_id, terminal_id=terminal_id) - assert terminal_id not in client.terminals + connection, client, _init_response = acp_terminal_shell + + # Directly test terminal methods are being called + # Since we're using passthrough model, we can't test actual LLM-driven tool calls + # but we can verify the terminal runtime is set up correctly + + # Create a session first to get a session ID + session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) + session_id = _get_session_id(session_response) + + # The terminals dict should be empty initially + assert len(client.terminals) == 0 + + # Manually test terminal lifecycle (client creates ID) + create_result = await client.create_terminal(command="echo test", session_id=session_id) + terminal_id = create_result.terminal_id + + # Verify terminal was created with client-generated ID + assert terminal_id == "terminal-1" # First terminal + assert terminal_id in client.terminals + assert client.terminals[terminal_id]["command"] == "echo test" + + # Get output + output = await client.terminal_output(session_id=session_id, terminal_id=terminal_id) + assert "Executed: echo test" in output.output + exit_info = await client.wait_for_terminal_exit(session_id=session_id, terminal_id=terminal_id) + assert exit_info.exit_code == 0 + + # Release terminal + await client.release_terminal(session_id=session_id, terminal_id=terminal_id) + assert terminal_id not in client.terminals @pytest.mark.integration -@pytest.mark.asyncio -async def test_acp_terminal_disabled_when_no_shell_flag() -> None: +async def test_acp_terminal_disabled_when_no_shell_flag( + acp_terminal_no_shell: tuple[object, TestClient, object], +) -> None: """Test that terminal runtime is not injected when --shell flag is not provided.""" - client = TestClient() - - async with spawn_agent_process(lambda _: client, *get_fast_agent_cmd(with_shell=False)) as ( - connection, - _process, - ): - # Initialize with terminal support (client side) - await connection.initialize( - protocol_version=1, - client_capabilities=ClientCapabilities( - fs=FileSystemCapability(read_text_file=True, write_text_file=True), - terminal=True, # Client supports it - ), - client_info=Implementation(name="pytest-terminal-client", version="0.0.1"), - ) - - # Create session - session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) - session_id = _get_session_id(session_response) - assert session_id - - # Terminal runtime should not be injected because --shell wasn't provided - # This test mainly ensures no errors occur when terminal capability is advertised - # but shell runtime isn't enabled + connection, _client, _init_response = acp_terminal_no_shell + + # Create session + session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) + session_id = _get_session_id(session_response) + assert session_id + + # Terminal runtime should not be injected because --shell wasn't provided + # This test mainly ensures no errors occur when terminal capability is advertised + # but shell runtime isn't enabled @pytest.mark.integration -@pytest.mark.asyncio -async def test_acp_terminal_disabled_when_client_unsupported() -> None: +async def test_acp_terminal_disabled_when_client_unsupported( + acp_terminal_client_unsupported: tuple[object, TestClient, object], +) -> None: """Test that terminal runtime is not used when client doesn't support terminals.""" - client = TestClient() - - async with spawn_agent_process(lambda _: client, *get_fast_agent_cmd(with_shell=True)) as ( - connection, - _process, - ): - # Initialize WITHOUT terminal support - await connection.initialize( - protocol_version=1, - client_capabilities=ClientCapabilities( - fs=FileSystemCapability(read_text_file=True, write_text_file=True), - terminal=False, # Client doesn't support terminals - ), - client_info=Implementation(name="pytest-terminal-client", version="0.0.1"), - ) - - # Create session - session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) - session_id = _get_session_id(session_response) - assert session_id - - # Agent will use local ShellRuntime instead of ACP terminals - # This test ensures graceful fallback + connection, _client, _init_response = acp_terminal_client_unsupported + + # Create session + session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) + session_id = _get_session_id(session_response) + assert session_id + + # Agent will use local ShellRuntime instead of ACP terminals + # This test ensures graceful fallback async def _wait_for_notifications(client: TestClient, timeout: float = 2.0) -> None: diff --git a/tests/integration/acp/test_acp_tool_notifications.py b/tests/integration/acp/test_acp_tool_notifications.py index 79dad9d11..4d5e0feaf 100644 --- a/tests/integration/acp/test_acp_tool_notifications.py +++ b/tests/integration/acp/test_acp_tool_notifications.py @@ -14,8 +14,7 @@ import pytest from acp.helpers import text_block -from acp.schema import ClientCapabilities, FileSystemCapability, Implementation, StopReason -from acp.stdio import spawn_agent_process +from acp.schema import StopReason from fast_agent.mcp.common import create_namespaced_name @@ -25,7 +24,8 @@ from test_client import TestClient # noqa: E402 -CONFIG_PATH = TEST_DIR / "fastagent.config.yaml" +pytestmark = pytest.mark.asyncio(loop_scope="module") + END_TURN: StopReason = "end_turn" @@ -45,189 +45,139 @@ def _get_session_update_type(update: Any) -> str | None: result = update.get("sessionUpdate") return str(result) if result is not None else None return None -FAST_AGENT_CMD = ( - sys.executable, - "-m", - "fast_agent.cli", - "serve", - "--config-path", - str(CONFIG_PATH), - "--transport", - "acp", - "--servers", - "progress_test", - "--model", - "passthrough", - "--name", - "fast-agent-acp-test", - # Disable permissions for these tests as they focus on notifications, not permissions - "--no-permissions", -) - @pytest.mark.integration -@pytest.mark.asyncio -async def test_acp_tool_call_notifications() -> None: +async def test_acp_tool_call_notifications( + acp_tool_notifications: tuple[object, TestClient, object], +) -> None: """Test that tool calls generate appropriate ACP notifications.""" - client = TestClient() - - async with spawn_agent_process(lambda _: client, *FAST_AGENT_CMD) as (connection, _process): - # Initialize - init_response = await connection.initialize( - protocol_version=1, - client_capabilities=ClientCapabilities( - fs=FileSystemCapability(read_text_file=True, write_text_file=True), - terminal=False, - ), - client_info=Implementation(name="pytest-client", version="0.0.1"), - ) - assert getattr(init_response, "protocol_version", None) == 1 or getattr( - init_response, "protocolVersion", None - ) == 1 - - # Create session - session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) - session_id = _get_session_id(session_response) - assert session_id - - # Send a prompt that will trigger a tool call - # Using the ***CALL_TOOL directive that the passthrough model supports - tool_name = create_namespaced_name("progress_test", "progress_task") - prompt_text = f'***CALL_TOOL {tool_name} {{"steps": 3}}' - prompt_response = await connection.prompt(session_id=session_id, prompt=[text_block(prompt_text)]) - assert _get_stop_reason(prompt_response) == END_TURN - - # Wait for notifications - await _wait_for_notifications(client, count=5, timeout=3.0) - - # Check notifications for tool_call and tool_call_update types - tool_notifications = [ - n - for n in client.notifications - if _get_session_update_type(n["update"]) in ["tool_call", "tool_call_update"] - ] - - # Should have at least one tool_call notification - assert len(tool_notifications) > 0, "Expected tool call notifications" - - # First notification should be tool_call (initial) - first_tool_notif = tool_notifications[0]["update"] - assert _get_session_update_type(first_tool_notif) == "tool_call" - assert hasattr(first_tool_notif, "toolCallId") - assert hasattr(first_tool_notif, "title") - assert hasattr(first_tool_notif, "kind") - assert hasattr(first_tool_notif, "status") - - # Status should be pending initially - assert first_tool_notif.status == "pending" - - # Subsequent notifications should be tool_call_update - if len(tool_notifications) > 1: - for notif in tool_notifications[1:]: - assert _get_session_update_type(notif["update"]) == "tool_call_update" - update = notif["update"] - assert hasattr(update, "toolCallId") or hasattr(update, "tool_call_id") - assert hasattr(update, "status") - - # Last notification should be completed or failed - last_status = tool_notifications[-1]["update"].status - assert last_status in ["completed", "failed"], ( - f"Expected final status, got {last_status}" - ) + connection, client, init_response = acp_tool_notifications + + assert getattr(init_response, "protocol_version", None) == 1 or getattr( + init_response, "protocolVersion", None + ) == 1 + + # Create session + session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) + session_id = _get_session_id(session_response) + assert session_id + + # Send a prompt that will trigger a tool call + # Using the ***CALL_TOOL directive that the passthrough model supports + tool_name = create_namespaced_name("progress_test", "progress_task") + prompt_text = f'***CALL_TOOL {tool_name} {{"steps": 3}}' + prompt_response = await connection.prompt(session_id=session_id, prompt=[text_block(prompt_text)]) + assert _get_stop_reason(prompt_response) == END_TURN + + # Wait for notifications + await _wait_for_notifications(client, count=5, timeout=3.0) + + # Check notifications for tool_call and tool_call_update types + tool_notifications = [ + n + for n in client.notifications + if _get_session_update_type(n["update"]) in ["tool_call", "tool_call_update"] + ] + + # Should have at least one tool_call notification + assert len(tool_notifications) > 0, "Expected tool call notifications" + + # First notification should be tool_call (initial) + first_tool_notif = tool_notifications[0]["update"] + assert _get_session_update_type(first_tool_notif) == "tool_call" + assert hasattr(first_tool_notif, "toolCallId") + assert hasattr(first_tool_notif, "title") + assert hasattr(first_tool_notif, "kind") + assert hasattr(first_tool_notif, "status") + + # Status should be pending initially + assert first_tool_notif.status == "pending" + + # Subsequent notifications should be tool_call_update + if len(tool_notifications) > 1: + for notif in tool_notifications[1:]: + assert _get_session_update_type(notif["update"]) == "tool_call_update" + update = notif["update"] + assert hasattr(update, "toolCallId") or hasattr(update, "tool_call_id") + assert hasattr(update, "status") + + # Last notification should be completed or failed + last_status = tool_notifications[-1]["update"].status + assert last_status in ["completed", "failed"], f"Expected final status, got {last_status}" @pytest.mark.integration -@pytest.mark.asyncio -async def test_acp_tool_progress_updates() -> None: +async def test_acp_tool_progress_updates( + acp_tool_notifications: tuple[object, TestClient, object], +) -> None: """Test that tool progress updates are sent via tool_call_update notifications.""" - client = TestClient() - - async with spawn_agent_process(lambda _: client, *FAST_AGENT_CMD) as (connection, _process): - # Initialize - await connection.initialize( - protocol_version=1, - client_capabilities=ClientCapabilities( - fs=FileSystemCapability(read_text_file=True, write_text_file=True), - terminal=False, - ), - client_info=Implementation(name="pytest-client", version="0.0.1"), - ) - - # Create session - session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) - session_id = _get_session_id(session_response) - - # Call a tool that reports progress - tool_name = create_namespaced_name("progress_test", "progress_task") - prompt_text = f'***CALL_TOOL {tool_name} {{"steps": 5}}' - await connection.prompt(session_id=session_id, prompt=[text_block(prompt_text)]) - - # Wait for multiple progress updates - await _wait_for_notifications(client, count=7, timeout=5.0) - - # Check for progress updates - tool_updates = [ - n - for n in client.notifications - if _get_session_update_type(n["update"]) == "tool_call_update" - ] + connection, client, _init_response = acp_tool_notifications + + # Create session + session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) + session_id = _get_session_id(session_response) + + # Call a tool that reports progress + tool_name = create_namespaced_name("progress_test", "progress_task") + prompt_text = f'***CALL_TOOL {tool_name} {{"steps": 5}}' + await connection.prompt(session_id=session_id, prompt=[text_block(prompt_text)]) - # Should have received progress updates - assert len(tool_updates) > 0, "Expected tool progress updates" + # Wait for multiple progress updates + await _wait_for_notifications(client, count=7, timeout=5.0) - # Updates should have content with progress messages - updates_with_content = [ - n for n in tool_updates if hasattr(n["update"], "content") and n["update"].content - ] + # Check for progress updates + tool_updates = [ + n + for n in client.notifications + if _get_session_update_type(n["update"]) == "tool_call_update" + ] - # At least some updates should have progress content - # (MCP progress notifications include messages) - assert len(updates_with_content) > 0, "Expected progress updates with content" + # Should have received progress updates + assert len(tool_updates) > 0, "Expected tool progress updates" + + # Updates should have content with progress messages + updates_with_content = [ + n for n in tool_updates if hasattr(n["update"], "content") and n["update"].content + ] + + # At least some updates should have progress content + # (MCP progress notifications include messages) + assert len(updates_with_content) > 0, "Expected progress updates with content" @pytest.mark.integration -@pytest.mark.asyncio -async def test_acp_tool_kinds_inferred() -> None: +async def test_acp_tool_kinds_inferred( + acp_tool_notifications: tuple[object, TestClient, object], +) -> None: """Test that tool kinds are properly inferred from tool names.""" - client = TestClient() - - async with spawn_agent_process(lambda _: client, *FAST_AGENT_CMD) as (connection, _process): - # Initialize - await connection.initialize( - protocol_version=1, - client_capabilities=ClientCapabilities( - fs=FileSystemCapability(read_text_file=True, write_text_file=True), - terminal=False, - ), - client_info=Implementation(name="pytest-client", version="0.0.1"), - ) - - # Create session - session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) - session_id = _get_session_id(session_response) - - # Call a tool - progress_task should be inferred as "other" - tool_name = create_namespaced_name("progress_test", "progress_task") - prompt_text = f'***CALL_TOOL {tool_name} {{"steps": 2}}' - await connection.prompt(session_id=session_id, prompt=[text_block(prompt_text)]) - - # Wait for notifications - await _wait_for_notifications(client, count=3, timeout=3.0) - - # Find the initial tool_call notification - tool_call_notif = next( - ( - n - for n in client.notifications - if _get_session_update_type(n["update"]) == "tool_call" - ), - None, - ) - - assert tool_call_notif is not None, "Expected tool_call notification" - assert hasattr(tool_call_notif["update"], "kind") - # progress_task doesn't match any specific pattern, should be "other" - assert tool_call_notif["update"].kind == "other" + connection, client, _init_response = acp_tool_notifications + + # Create session + session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) + session_id = _get_session_id(session_response) + + # Call a tool - progress_task should be inferred as "other" + tool_name = create_namespaced_name("progress_test", "progress_task") + prompt_text = f'***CALL_TOOL {tool_name} {{"steps": 2}}' + await connection.prompt(session_id=session_id, prompt=[text_block(prompt_text)]) + + # Wait for notifications + await _wait_for_notifications(client, count=3, timeout=3.0) + + # Find the initial tool_call notification + tool_call_notif = next( + ( + n + for n in client.notifications + if _get_session_update_type(n["update"]) == "tool_call" + ), + None, + ) + + assert tool_call_notif is not None, "Expected tool_call notification" + assert hasattr(tool_call_notif["update"], "kind") + # progress_task doesn't match any specific pattern, should be "other" + assert tool_call_notif["update"].kind == "other" async def _wait_for_notifications(client: TestClient, count: int = 1, timeout: float = 2.0) -> None: diff --git a/tests/integration/acp/test_client.py b/tests/integration/acp/test_client.py index 04d245ac1..bfa08b67b 100644 --- a/tests/integration/acp/test_client.py +++ b/tests/integration/acp/test_client.py @@ -44,6 +44,15 @@ def __init__(self) -> None: self.terminals: dict[str, dict[str, Any]] = {} self._terminal_count: int = 0 # For generating terminal IDs like real clients + def reset(self) -> None: + self.permission_outcomes.clear() + self.files.clear() + self.notifications.clear() + self.ext_calls.clear() + self.ext_notes.clear() + self.terminals.clear() + self._terminal_count = 0 + def queue_permission_cancelled(self) -> None: self.permission_outcomes.append( RequestPermissionResponse(outcome=DeniedOutcome(outcome="cancelled")) From b7d4d05e1a35be0ce653660d77a78e3646bae725 Mon Sep 17 00:00:00 2001 From: evalstate <1936278+evalstate@users.noreply.github.com> Date: Wed, 31 Dec 2025 13:21:50 +0000 Subject: [PATCH 2/7] type & lint --- tests/integration/acp/conftest.py | 95 ++++++++++++------- tests/integration/acp/test_acp_basic.py | 37 +++++--- .../acp/test_acp_content_blocks.py | 17 ++-- .../acp/test_acp_filesystem_toolcall.py | 34 ++++--- tests/integration/acp/test_acp_permissions.py | 28 ++++-- .../acp/test_acp_runtime_telemetry.py | 15 +-- tests/integration/acp/test_acp_status_line.py | 11 ++- tests/integration/acp/test_acp_terminal.py | 30 +++--- .../acp/test_acp_tool_notifications.py | 15 +-- 9 files changed, 179 insertions(+), 103 deletions(-) diff --git a/tests/integration/acp/conftest.py b/tests/integration/acp/conftest.py index 47d74706b..a7d7d6593 100644 --- a/tests/integration/acp/conftest.py +++ b/tests/integration/acp/conftest.py @@ -3,10 +3,10 @@ import sys from contextlib import asynccontextmanager from pathlib import Path -from typing import Any, AsyncIterator +from typing import TYPE_CHECKING, AsyncIterator import pytest_asyncio -from acp.schema import ClientCapabilities, FileSystemCapability, Implementation +from acp.schema import ClientCapabilities, FileSystemCapability, Implementation, InitializeResponse from acp.stdio import spawn_agent_process TEST_DIR = Path(__file__).parent @@ -15,6 +15,9 @@ from test_client import TestClient # noqa: E402 +if TYPE_CHECKING: + from acp.client.connection import ClientSideConnection + CONFIG_PATH = TEST_DIR / "fastagent.config.yaml" @@ -57,7 +60,7 @@ async def _spawn_initialized_agent( fs_write: bool = True, client_name: str = "pytest-client", client_version: str = "0.0.1", -) -> AsyncIterator[tuple[Any, TestClient, Any]]: +) -> AsyncIterator[tuple[ClientSideConnection, TestClient, InitializeResponse]]: client = TestClient() async with spawn_agent_process(lambda _: client, *cmd) as (connection, _process): init_response = await connection.initialize( @@ -72,7 +75,7 @@ async def _spawn_initialized_agent( @pytest_asyncio.fixture(scope="module", loop_scope="module") -async def acp_basic_process() -> AsyncIterator[tuple[Any, TestClient, Any]]: +async def acp_basic_process() -> AsyncIterator[tuple[ClientSideConnection, TestClient, InitializeResponse]]: cmd = _fast_agent_cmd("fast-agent-acp-test") async with _spawn_initialized_agent(cmd, terminal=False) as harness: yield harness @@ -80,15 +83,17 @@ async def acp_basic_process() -> AsyncIterator[tuple[Any, TestClient, Any]]: @pytest_asyncio.fixture async def acp_basic( - acp_basic_process: tuple[Any, TestClient, Any], -) -> AsyncIterator[tuple[Any, TestClient, Any]]: + acp_basic_process: tuple[ClientSideConnection, TestClient, InitializeResponse], +) -> AsyncIterator[tuple[ClientSideConnection, TestClient, InitializeResponse]]: connection, client, init_response = acp_basic_process client.reset() yield connection, client, init_response @pytest_asyncio.fixture(scope="module", loop_scope="module") -async def acp_content_process() -> AsyncIterator[tuple[Any, TestClient, Any]]: +async def acp_content_process() -> AsyncIterator[ + tuple[ClientSideConnection, TestClient, InitializeResponse] +]: cmd = _fast_agent_cmd("fast-agent-acp-content-test") async with _spawn_initialized_agent(cmd, terminal=False) as harness: yield harness @@ -96,15 +101,17 @@ async def acp_content_process() -> AsyncIterator[tuple[Any, TestClient, Any]]: @pytest_asyncio.fixture async def acp_content( - acp_content_process: tuple[Any, TestClient, Any], -) -> AsyncIterator[tuple[Any, TestClient, Any]]: + acp_content_process: tuple[ClientSideConnection, TestClient, InitializeResponse], +) -> AsyncIterator[tuple[ClientSideConnection, TestClient, InitializeResponse]]: connection, client, init_response = acp_content_process client.reset() yield connection, client, init_response @pytest_asyncio.fixture(scope="module", loop_scope="module") -async def acp_filesystem_toolcall_process() -> AsyncIterator[tuple[Any, TestClient, Any]]: +async def acp_filesystem_toolcall_process() -> AsyncIterator[ + tuple[ClientSideConnection, TestClient, InitializeResponse] +]: cmd = _fast_agent_cmd( "fast-agent-acp-filesystem-toolcall-test", no_permissions=True, @@ -119,15 +126,17 @@ async def acp_filesystem_toolcall_process() -> AsyncIterator[tuple[Any, TestClie @pytest_asyncio.fixture async def acp_filesystem_toolcall( - acp_filesystem_toolcall_process: tuple[Any, TestClient, Any], -) -> AsyncIterator[tuple[Any, TestClient, Any]]: + acp_filesystem_toolcall_process: tuple[ClientSideConnection, TestClient, InitializeResponse], +) -> AsyncIterator[tuple[ClientSideConnection, TestClient, InitializeResponse]]: connection, client, init_response = acp_filesystem_toolcall_process client.reset() yield connection, client, init_response @pytest_asyncio.fixture(scope="module", loop_scope="module") -async def acp_permissions_process() -> AsyncIterator[tuple[Any, TestClient, Any]]: +async def acp_permissions_process() -> AsyncIterator[ + tuple[ClientSideConnection, TestClient, InitializeResponse] +]: cmd = _fast_agent_cmd( "fast-agent-acp-test", servers=("progress_test",), @@ -138,15 +147,17 @@ async def acp_permissions_process() -> AsyncIterator[tuple[Any, TestClient, Any] @pytest_asyncio.fixture async def acp_permissions( - acp_permissions_process: tuple[Any, TestClient, Any], -) -> AsyncIterator[tuple[Any, TestClient, Any]]: + acp_permissions_process: tuple[ClientSideConnection, TestClient, InitializeResponse], +) -> AsyncIterator[tuple[ClientSideConnection, TestClient, InitializeResponse]]: connection, client, init_response = acp_permissions_process client.reset() yield connection, client, init_response @pytest_asyncio.fixture(scope="module", loop_scope="module") -async def acp_permissions_no_perms_process() -> AsyncIterator[tuple[Any, TestClient, Any]]: +async def acp_permissions_no_perms_process() -> AsyncIterator[ + tuple[ClientSideConnection, TestClient, InitializeResponse] +]: cmd = _fast_agent_cmd( "fast-agent-acp-test", servers=("progress_test",), @@ -158,15 +169,17 @@ async def acp_permissions_no_perms_process() -> AsyncIterator[tuple[Any, TestCli @pytest_asyncio.fixture async def acp_permissions_no_perms( - acp_permissions_no_perms_process: tuple[Any, TestClient, Any], -) -> AsyncIterator[tuple[Any, TestClient, Any]]: + acp_permissions_no_perms_process: tuple[ClientSideConnection, TestClient, InitializeResponse], +) -> AsyncIterator[tuple[ClientSideConnection, TestClient, InitializeResponse]]: connection, client, init_response = acp_permissions_no_perms_process client.reset() yield connection, client, init_response @pytest_asyncio.fixture(scope="module", loop_scope="module") -async def acp_runtime_telemetry_shell_process() -> AsyncIterator[tuple[Any, TestClient, Any]]: +async def acp_runtime_telemetry_shell_process() -> AsyncIterator[ + tuple[ClientSideConnection, TestClient, InitializeResponse] +]: cmd = _fast_agent_cmd( "fast-agent-acp-runtime-telemetry-test", no_permissions=True, @@ -182,15 +195,17 @@ async def acp_runtime_telemetry_shell_process() -> AsyncIterator[tuple[Any, Test @pytest_asyncio.fixture async def acp_runtime_telemetry_shell( - acp_runtime_telemetry_shell_process: tuple[Any, TestClient, Any], -) -> AsyncIterator[tuple[Any, TestClient, Any]]: + acp_runtime_telemetry_shell_process: tuple[ClientSideConnection, TestClient, InitializeResponse], +) -> AsyncIterator[tuple[ClientSideConnection, TestClient, InitializeResponse]]: connection, client, init_response = acp_runtime_telemetry_shell_process client.reset() yield connection, client, init_response @pytest_asyncio.fixture(scope="module", loop_scope="module") -async def acp_runtime_telemetry_process() -> AsyncIterator[tuple[Any, TestClient, Any]]: +async def acp_runtime_telemetry_process() -> AsyncIterator[ + tuple[ClientSideConnection, TestClient, InitializeResponse] +]: cmd = _fast_agent_cmd( "fast-agent-acp-runtime-telemetry-test", no_permissions=True, @@ -205,15 +220,17 @@ async def acp_runtime_telemetry_process() -> AsyncIterator[tuple[Any, TestClient @pytest_asyncio.fixture async def acp_runtime_telemetry( - acp_runtime_telemetry_process: tuple[Any, TestClient, Any], -) -> AsyncIterator[tuple[Any, TestClient, Any]]: + acp_runtime_telemetry_process: tuple[ClientSideConnection, TestClient, InitializeResponse], +) -> AsyncIterator[tuple[ClientSideConnection, TestClient, InitializeResponse]]: connection, client, init_response = acp_runtime_telemetry_process client.reset() yield connection, client, init_response @pytest_asyncio.fixture(scope="module", loop_scope="module") -async def acp_tool_notifications_process() -> AsyncIterator[tuple[Any, TestClient, Any]]: +async def acp_tool_notifications_process() -> AsyncIterator[ + tuple[ClientSideConnection, TestClient, InitializeResponse] +]: cmd = _fast_agent_cmd( "fast-agent-acp-test", servers=("progress_test",), @@ -225,15 +242,17 @@ async def acp_tool_notifications_process() -> AsyncIterator[tuple[Any, TestClien @pytest_asyncio.fixture async def acp_tool_notifications( - acp_tool_notifications_process: tuple[Any, TestClient, Any], -) -> AsyncIterator[tuple[Any, TestClient, Any]]: + acp_tool_notifications_process: tuple[ClientSideConnection, TestClient, InitializeResponse], +) -> AsyncIterator[tuple[ClientSideConnection, TestClient, InitializeResponse]]: connection, client, init_response = acp_tool_notifications_process client.reset() yield connection, client, init_response @pytest_asyncio.fixture(scope="module", loop_scope="module") -async def acp_terminal_shell_process() -> AsyncIterator[tuple[Any, TestClient, Any]]: +async def acp_terminal_shell_process() -> AsyncIterator[ + tuple[ClientSideConnection, TestClient, InitializeResponse] +]: cmd = _fast_agent_cmd( "fast-agent-acp-terminal-test", shell=True, @@ -248,15 +267,17 @@ async def acp_terminal_shell_process() -> AsyncIterator[tuple[Any, TestClient, A @pytest_asyncio.fixture async def acp_terminal_shell( - acp_terminal_shell_process: tuple[Any, TestClient, Any], -) -> AsyncIterator[tuple[Any, TestClient, Any]]: + acp_terminal_shell_process: tuple[ClientSideConnection, TestClient, InitializeResponse], +) -> AsyncIterator[tuple[ClientSideConnection, TestClient, InitializeResponse]]: connection, client, init_response = acp_terminal_shell_process client.reset() yield connection, client, init_response @pytest_asyncio.fixture(scope="module", loop_scope="module") -async def acp_terminal_no_shell_process() -> AsyncIterator[tuple[Any, TestClient, Any]]: +async def acp_terminal_no_shell_process() -> AsyncIterator[ + tuple[ClientSideConnection, TestClient, InitializeResponse] +]: cmd = _fast_agent_cmd("fast-agent-acp-terminal-test") async with _spawn_initialized_agent( cmd, @@ -268,15 +289,17 @@ async def acp_terminal_no_shell_process() -> AsyncIterator[tuple[Any, TestClient @pytest_asyncio.fixture async def acp_terminal_no_shell( - acp_terminal_no_shell_process: tuple[Any, TestClient, Any], -) -> AsyncIterator[tuple[Any, TestClient, Any]]: + acp_terminal_no_shell_process: tuple[ClientSideConnection, TestClient, InitializeResponse], +) -> AsyncIterator[tuple[ClientSideConnection, TestClient, InitializeResponse]]: connection, client, init_response = acp_terminal_no_shell_process client.reset() yield connection, client, init_response @pytest_asyncio.fixture(scope="module", loop_scope="module") -async def acp_terminal_client_unsupported_process() -> AsyncIterator[tuple[Any, TestClient, Any]]: +async def acp_terminal_client_unsupported_process() -> AsyncIterator[ + tuple[ClientSideConnection, TestClient, InitializeResponse] +]: cmd = _fast_agent_cmd( "fast-agent-acp-terminal-test", shell=True, @@ -291,8 +314,8 @@ async def acp_terminal_client_unsupported_process() -> AsyncIterator[tuple[Any, @pytest_asyncio.fixture async def acp_terminal_client_unsupported( - acp_terminal_client_unsupported_process: tuple[Any, TestClient, Any], -) -> AsyncIterator[tuple[Any, TestClient, Any]]: + acp_terminal_client_unsupported_process: tuple[ClientSideConnection, TestClient, InitializeResponse], +) -> AsyncIterator[tuple[ClientSideConnection, TestClient, InitializeResponse]]: connection, client, init_response = acp_terminal_client_unsupported_process client.reset() yield connection, client, init_response diff --git a/tests/integration/acp/test_acp_basic.py b/tests/integration/acp/test_acp_basic.py index 57ab18820..4a3086055 100644 --- a/tests/integration/acp/test_acp_basic.py +++ b/tests/integration/acp/test_acp_basic.py @@ -3,17 +3,20 @@ import asyncio import sys from pathlib import Path -from typing import Any +from typing import TYPE_CHECKING, Any import pytest from acp.helpers import text_block -from acp.schema import StopReason TEST_DIR = Path(__file__).parent if str(TEST_DIR) not in sys.path: sys.path.append(str(TEST_DIR)) -from test_client import TestClient # noqa: E402 + +if TYPE_CHECKING: + from acp.client.connection import ClientSideConnection + from acp.schema import InitializeResponse, StopReason + from test_client import TestClient pytestmark = pytest.mark.asyncio(loop_scope="module") @@ -22,14 +25,18 @@ @pytest.mark.integration async def test_acp_initialize_and_prompt_roundtrip( - acp_basic: tuple[object, TestClient, object], + acp_basic: tuple[ClientSideConnection, TestClient, InitializeResponse], ) -> None: """Ensure the ACP transport initializes, creates a session, and echoes prompts.""" connection, client, init_response = acp_basic assert init_response.protocol_version == 1 assert init_response.agent_capabilities is not None - assert init_response.agent_info.name == "fast-agent-acp-test" + agent_info = getattr(init_response, "agent_info", None) or getattr( + init_response, "agentInfo", None + ) + assert agent_info is not None + assert agent_info.name == "fast-agent-acp-test" # AgentCapabilities schema changed upstream; ensure we advertised prompt support. prompt_caps = getattr(init_response.agent_capabilities, "prompts", None) or getattr( init_response.agent_capabilities, "prompt_capabilities", None @@ -84,14 +91,20 @@ async def _wait_for_notifications(client: TestClient, timeout: float = 2.0) -> N raise AssertionError("Expected streamed session updates") +def _get_stop_reason(response: object) -> str | None: + return getattr(response, "stop_reason", None) or getattr(response, "stopReason", None) + + @pytest.mark.integration async def test_acp_session_modes_included_in_new_session( - acp_basic: tuple[object, TestClient, object], + acp_basic: tuple[ClientSideConnection, TestClient, InitializeResponse], ) -> None: """Test that session/new response includes modes field.""" connection, _client, init_response = acp_basic - assert init_response.protocolVersion == 1 + assert getattr(init_response, "protocol_version", None) == 1 or getattr( + init_response, "protocolVersion", None + ) == 1 # Create session session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) @@ -116,7 +129,7 @@ async def test_acp_session_modes_included_in_new_session( @pytest.mark.integration async def test_acp_overlapping_prompts_are_refused( - acp_basic: tuple[object, TestClient, object], + acp_basic: tuple[ClientSideConnection, TestClient, InitializeResponse], ) -> None: """ Test that overlapping prompt requests for the same session are refused. @@ -127,7 +140,9 @@ async def test_acp_overlapping_prompts_are_refused( """ connection, _client, init_response = acp_basic - assert init_response.protocolVersion == 1 + assert getattr(init_response, "protocol_version", None) == 1 or getattr( + init_response, "protocolVersion", None + ) == 1 # Create session session_response = await connection.new_session(mcp_servers=[], cwd=str(TEST_DIR)) @@ -150,7 +165,7 @@ async def test_acp_overlapping_prompts_are_refused( # One should succeed, one should be refused # (We don't know which one arrives first due to async scheduling) - responses = [prompt1_response.stopReason, prompt2_response.stopReason] + responses = [_get_stop_reason(prompt1_response), _get_stop_reason(prompt2_response)] assert "end_turn" in responses, "One prompt should succeed" assert "refusal" in responses, "One prompt should be refused" @@ -158,4 +173,4 @@ async def test_acp_overlapping_prompts_are_refused( prompt3_response = await connection.prompt( session_id=session_id, prompt=[text_block("third prompt")] ) - assert prompt3_response.stopReason == END_TURN + assert _get_stop_reason(prompt3_response) == END_TURN diff --git a/tests/integration/acp/test_acp_content_blocks.py b/tests/integration/acp/test_acp_content_blocks.py index 7997f4834..55e15ddbf 100644 --- a/tests/integration/acp/test_acp_content_blocks.py +++ b/tests/integration/acp/test_acp_content_blocks.py @@ -10,6 +10,7 @@ import base64 import sys from pathlib import Path +from typing import TYPE_CHECKING import pytest from acp.helpers import text_block @@ -17,15 +18,19 @@ BlobResourceContents, EmbeddedResourceContentBlock, ImageContentBlock, + InitializeResponse, StopReason, TextResourceContents, ) +if TYPE_CHECKING: + from acp.client.connection import ClientSideConnection + from test_client import TestClient + TEST_DIR = Path(__file__).parent if str(TEST_DIR) not in sys.path: sys.path.append(str(TEST_DIR)) -from test_client import TestClient # noqa: E402 pytestmark = pytest.mark.asyncio(loop_scope="module") @@ -34,7 +39,7 @@ @pytest.mark.integration async def test_acp_image_content_processing( - acp_content: tuple[object, TestClient, object], + acp_content: tuple[ClientSideConnection, TestClient, InitializeResponse], ) -> None: """Test that image content blocks are properly processed.""" connection, client, init_response = acp_content @@ -91,7 +96,7 @@ async def test_acp_image_content_processing( @pytest.mark.integration async def test_acp_embedded_text_resource_processing( - acp_content: tuple[object, TestClient, object], + acp_content: tuple[ClientSideConnection, TestClient, InitializeResponse], ) -> None: """Test that embedded text resource content blocks are properly processed.""" connection, client, init_response = acp_content @@ -151,7 +156,7 @@ async def test_acp_embedded_text_resource_processing( @pytest.mark.integration async def test_acp_embedded_blob_resource_processing( - acp_content: tuple[object, TestClient, object], + acp_content: tuple[ClientSideConnection, TestClient, InitializeResponse], ) -> None: """Test that embedded blob resource content blocks are properly processed.""" connection, client, _init_response = acp_content @@ -195,7 +200,7 @@ async def test_acp_embedded_blob_resource_processing( @pytest.mark.integration async def test_acp_mixed_content_blocks( - acp_content: tuple[object, TestClient, object], + acp_content: tuple[ClientSideConnection, TestClient, InitializeResponse], ) -> None: """Test that mixed content blocks (text, image, resource) work together.""" connection, client, _init_response = acp_content @@ -246,7 +251,7 @@ async def test_acp_mixed_content_blocks( @pytest.mark.integration async def test_acp_resource_only_prompt_not_slash_command( - acp_content: tuple[object, TestClient, object], + acp_content: tuple[ClientSideConnection, TestClient, InitializeResponse], ) -> None: """ Test that resource-only prompts with text starting with "/" are not treated as slash commands. diff --git a/tests/integration/acp/test_acp_filesystem_toolcall.py b/tests/integration/acp/test_acp_filesystem_toolcall.py index a7a153767..d1e04082c 100644 --- a/tests/integration/acp/test_acp_filesystem_toolcall.py +++ b/tests/integration/acp/test_acp_filesystem_toolcall.py @@ -5,16 +5,20 @@ import asyncio import sys from pathlib import Path +from typing import TYPE_CHECKING import pytest from acp.helpers import text_block -from acp.schema import StopReason TEST_DIR = Path(__file__).parent if str(TEST_DIR) not in sys.path: sys.path.append(str(TEST_DIR)) -from test_client import TestClient # noqa: E402 + +if TYPE_CHECKING: + from acp.client.connection import ClientSideConnection + from acp.schema import InitializeResponse, StopReason + from test_client import TestClient pytestmark = pytest.mark.asyncio(loop_scope="module") @@ -42,7 +46,7 @@ async def _wait_for_notifications(client: TestClient, timeout: float = 2.0) -> N @pytest.mark.integration async def test_acp_filesystem_read_tool_call( - acp_filesystem_toolcall: tuple[object, TestClient, object], + acp_filesystem_toolcall: tuple[ClientSideConnection, TestClient, InitializeResponse], ) -> None: """Test that read_text_file tool can be called via passthrough model.""" connection, client, init_response = acp_filesystem_toolcall @@ -52,13 +56,13 @@ async def test_acp_filesystem_read_tool_call( test_content = "Hello from test file!" client.files[test_path] = test_content - assert getattr(init_response, "protocol_version", None) == 1 or getattr( - init_response, "protocolVersion", None - ) == 1 + assert ( + getattr(init_response, "protocol_version", None) == 1 + or getattr(init_response, "protocolVersion", None) == 1 + ) assert ( getattr(init_response, "agent_capabilities", None) - or getattr(init_response, "agentCapabilities", None) - is not None + or getattr(init_response, "agentCapabilities", None) is not None ) # Create session @@ -68,7 +72,9 @@ async def test_acp_filesystem_read_tool_call( # Use passthrough model's ***CALL_TOOL directive to invoke read_text_file prompt_text = f'***CALL_TOOL read_text_file {{"path": "{test_path}"}}' - prompt_response = await connection.prompt(session_id=session_id, prompt=[text_block(prompt_text)]) + prompt_response = await connection.prompt( + session_id=session_id, prompt=[text_block(prompt_text)] + ) # Should complete successfully assert _get_stop_reason(prompt_response) == END_TURN @@ -87,7 +93,7 @@ async def test_acp_filesystem_read_tool_call( @pytest.mark.integration async def test_acp_filesystem_write_tool_call( - acp_filesystem_toolcall: tuple[object, TestClient, object], + acp_filesystem_toolcall: tuple[ClientSideConnection, TestClient, InitializeResponse], ) -> None: """Test that write_text_file tool can be called via passthrough model.""" connection, client, _init_response = acp_filesystem_toolcall @@ -100,9 +106,13 @@ async def test_acp_filesystem_write_tool_call( # Use passthrough model's ***CALL_TOOL directive to invoke write_text_file test_path = "/test/output.txt" test_content = "Test content from tool call" - prompt_text = f'***CALL_TOOL write_text_file {{"path": "{test_path}", "content": "{test_content}"}}' + prompt_text = ( + f'***CALL_TOOL write_text_file {{"path": "{test_path}", "content": "{test_content}"}}' + ) - prompt_response = await connection.prompt(session_id=session_id, prompt=[text_block(prompt_text)]) + prompt_response = await connection.prompt( + session_id=session_id, prompt=[text_block(prompt_text)] + ) # Should complete successfully assert _get_stop_reason(prompt_response) == END_TURN diff --git a/tests/integration/acp/test_acp_permissions.py b/tests/integration/acp/test_acp_permissions.py index 43de0fe6a..14ea74ce9 100644 --- a/tests/integration/acp/test_acp_permissions.py +++ b/tests/integration/acp/test_acp_permissions.py @@ -11,10 +11,10 @@ import sys import tempfile from pathlib import Path +from typing import TYPE_CHECKING import pytest from acp.helpers import text_block -from acp.schema import StopReason from fast_agent.mcp.common import create_namespaced_name @@ -22,7 +22,11 @@ if str(TEST_DIR) not in sys.path: sys.path.append(str(TEST_DIR)) -from test_client import TestClient # noqa: E402 + +if TYPE_CHECKING: + from acp.client.connection import ClientSideConnection + from acp.schema import InitializeResponse, StopReason + from test_client import TestClient pytestmark = pytest.mark.asyncio(loop_scope="module") @@ -77,7 +81,7 @@ def _tool_was_denied(client: TestClient) -> bool: @pytest.mark.integration async def test_permission_request_sent_when_tool_called( - acp_permissions: tuple[object, TestClient, object], + acp_permissions: tuple[ClientSideConnection, TestClient, InitializeResponse], ) -> None: """Test that a permission request is sent when a tool is called.""" connection, client, _init_response = acp_permissions @@ -91,7 +95,9 @@ async def test_permission_request_sent_when_tool_called( # Send a prompt that will trigger a tool call tool_name = create_namespaced_name("progress_test", "progress_task") prompt_text = f'***CALL_TOOL {tool_name} {{"steps": 1}}' - prompt_response = await connection.prompt(session_id=session_id, prompt=[text_block(prompt_text)]) + prompt_response = await connection.prompt( + session_id=session_id, prompt=[text_block(prompt_text)] + ) # The tool should have been denied (permission cancelled) assert _get_stop_reason(prompt_response) == END_TURN @@ -107,7 +113,7 @@ async def test_permission_request_sent_when_tool_called( @pytest.mark.integration async def test_allow_once_permits_execution_without_persistence( - acp_permissions: tuple[object, TestClient, object], + acp_permissions: tuple[ClientSideConnection, TestClient, InitializeResponse], ) -> None: """Test that allow_once permits execution but doesn't persist.""" connection, client, _init_response = acp_permissions @@ -142,7 +148,7 @@ async def test_allow_once_permits_execution_without_persistence( @pytest.mark.integration async def test_allow_always_persists( - acp_permissions: tuple[object, TestClient, object], + acp_permissions: tuple[ClientSideConnection, TestClient, InitializeResponse], ) -> None: """Test that allow_always permits execution and persists.""" connection, client, _init_response = acp_permissions @@ -180,7 +186,7 @@ async def test_allow_always_persists( @pytest.mark.integration async def test_reject_once_blocks_without_persistence( - acp_permissions: tuple[object, TestClient, object], + acp_permissions: tuple[ClientSideConnection, TestClient, InitializeResponse], ) -> None: """Test that reject_once blocks execution but doesn't persist.""" connection, client, _init_response = acp_permissions @@ -217,7 +223,7 @@ async def test_reject_once_blocks_without_persistence( @pytest.mark.integration async def test_reject_always_blocks_and_persists( - acp_permissions: tuple[object, TestClient, object], + acp_permissions: tuple[ClientSideConnection, TestClient, InitializeResponse], ) -> None: """Test that reject_always blocks execution and persists.""" connection, client, _init_response = acp_permissions @@ -257,7 +263,7 @@ async def test_reject_always_blocks_and_persists( @pytest.mark.integration async def test_no_permissions_flag_disables_checks( - acp_permissions_no_perms: tuple[object, TestClient, object], + acp_permissions_no_perms: tuple[ClientSideConnection, TestClient, InitializeResponse], ) -> None: """Test that --no-permissions flag allows all tool executions.""" connection, client, _init_response = acp_permissions_no_perms @@ -281,4 +287,6 @@ async def test_no_permissions_flag_disables_checks( await _wait_for_notifications(client, count=3, timeout=3.0) # Tool should have executed successfully without needing permission - assert _tool_executed_successfully(client), "Tool should have executed with --no-permissions flag" + assert _tool_executed_successfully(client), ( + "Tool should have executed with --no-permissions flag" + ) diff --git a/tests/integration/acp/test_acp_runtime_telemetry.py b/tests/integration/acp/test_acp_runtime_telemetry.py index 13f4c0154..5ef22580a 100644 --- a/tests/integration/acp/test_acp_runtime_telemetry.py +++ b/tests/integration/acp/test_acp_runtime_telemetry.py @@ -10,17 +10,20 @@ import asyncio import sys from pathlib import Path -from typing import Any +from typing import TYPE_CHECKING, Any import pytest from acp.helpers import text_block -from acp.schema import StopReason TEST_DIR = Path(__file__).parent if str(TEST_DIR) not in sys.path: sys.path.append(str(TEST_DIR)) -from test_client import TestClient # noqa: E402 + +if TYPE_CHECKING: + from acp.client.connection import ClientSideConnection + from acp.schema import InitializeResponse, StopReason + from test_client import TestClient pytestmark = pytest.mark.asyncio(loop_scope="module") @@ -58,7 +61,7 @@ async def _wait_for_notifications(client: TestClient, count: int = 1, timeout: f @pytest.mark.integration async def test_acp_terminal_runtime_telemetry( - acp_runtime_telemetry_shell: tuple[object, TestClient, object], + acp_runtime_telemetry_shell: tuple[ClientSideConnection, TestClient, InitializeResponse], ) -> None: """Test that terminal execute operations trigger tool call notifications.""" connection, client, init_response = acp_runtime_telemetry_shell @@ -113,7 +116,7 @@ async def test_acp_terminal_runtime_telemetry( @pytest.mark.integration async def test_acp_filesystem_read_runtime_telemetry( - acp_runtime_telemetry: tuple[object, TestClient, object], + acp_runtime_telemetry: tuple[ClientSideConnection, TestClient, InitializeResponse], ) -> None: """Test that read_text_file operations trigger tool call notifications.""" connection, client, _init_response = acp_runtime_telemetry @@ -163,7 +166,7 @@ async def test_acp_filesystem_read_runtime_telemetry( @pytest.mark.integration async def test_acp_filesystem_write_runtime_telemetry( - acp_runtime_telemetry: tuple[object, TestClient, object], + acp_runtime_telemetry: tuple[ClientSideConnection, TestClient, InitializeResponse], ) -> None: """Test that write_text_file operations trigger tool call notifications.""" connection, client, _init_response = acp_runtime_telemetry diff --git a/tests/integration/acp/test_acp_status_line.py b/tests/integration/acp/test_acp_status_line.py index 4a0b58676..112ec99e5 100644 --- a/tests/integration/acp/test_acp_status_line.py +++ b/tests/integration/acp/test_acp_status_line.py @@ -4,17 +4,20 @@ import re import sys from pathlib import Path -from typing import cast +from typing import TYPE_CHECKING, cast import pytest from acp.helpers import text_block -from acp.schema import StopReason TEST_DIR = Path(__file__).parent if str(TEST_DIR) not in sys.path: sys.path.append(str(TEST_DIR)) -from test_client import TestClient # noqa: E402 + +if TYPE_CHECKING: + from acp.client.connection import ClientSideConnection + from acp.schema import InitializeResponse, StopReason + from test_client import TestClient pytestmark = pytest.mark.asyncio(loop_scope="module") @@ -54,7 +57,7 @@ async def _wait_for_status_line(client: TestClient, timeout: float = 2.0) -> str @pytest.mark.integration async def test_acp_status_line_meta_is_emitted( - acp_basic: tuple[object, TestClient, object], + acp_basic: tuple[ClientSideConnection, TestClient, InitializeResponse], ) -> None: connection, client, _init_response = acp_basic diff --git a/tests/integration/acp/test_acp_terminal.py b/tests/integration/acp/test_acp_terminal.py index 5711ddfdd..32ebeda64 100644 --- a/tests/integration/acp/test_acp_terminal.py +++ b/tests/integration/acp/test_acp_terminal.py @@ -5,16 +5,20 @@ import asyncio import sys from pathlib import Path +from typing import TYPE_CHECKING import pytest from acp.helpers import text_block -from acp.schema import StopReason + +if TYPE_CHECKING: + from acp.client.connection import ClientSideConnection + from acp.schema import InitializeResponse, StopReason + from test_client import TestClient TEST_DIR = Path(__file__).parent if str(TEST_DIR) not in sys.path: sys.path.append(str(TEST_DIR)) -from test_client import TestClient # noqa: E402 pytestmark = pytest.mark.asyncio(loop_scope="module") @@ -31,18 +35,18 @@ def _get_stop_reason(response: object) -> str | None: @pytest.mark.integration async def test_acp_terminal_support_enabled( - acp_terminal_shell: tuple[object, TestClient, object], + acp_terminal_shell: tuple[ClientSideConnection, TestClient, InitializeResponse], ) -> None: """Test that terminal support is properly enabled when client advertises capability.""" connection, client, init_response = acp_terminal_shell - assert getattr(init_response, "protocol_version", None) == 1 or getattr( - init_response, "protocolVersion", None - ) == 1 + assert ( + getattr(init_response, "protocol_version", None) == 1 + or getattr(init_response, "protocolVersion", None) == 1 + ) assert ( getattr(init_response, "agent_capabilities", None) - or getattr(init_response, "agentCapabilities", None) - is not None + or getattr(init_response, "agentCapabilities", None) is not None ) # Create session @@ -53,7 +57,9 @@ async def test_acp_terminal_support_enabled( # Send prompt that should trigger terminal execution # The passthrough model will echo our input, so we craft a tool call request prompt_text = 'use the execute tool to run: echo "test terminal"' - prompt_response = await connection.prompt(session_id=session_id, prompt=[text_block(prompt_text)]) + prompt_response = await connection.prompt( + session_id=session_id, prompt=[text_block(prompt_text)] + ) assert _get_stop_reason(prompt_response) == END_TURN # Wait for any notifications @@ -65,7 +71,7 @@ async def test_acp_terminal_support_enabled( @pytest.mark.integration async def test_acp_terminal_execution( - acp_terminal_shell: tuple[object, TestClient, object], + acp_terminal_shell: tuple[ClientSideConnection, TestClient, InitializeResponse], ) -> None: """Test actual terminal command execution via ACP.""" connection, client, _init_response = acp_terminal_shell @@ -103,7 +109,7 @@ async def test_acp_terminal_execution( @pytest.mark.integration async def test_acp_terminal_disabled_when_no_shell_flag( - acp_terminal_no_shell: tuple[object, TestClient, object], + acp_terminal_no_shell: tuple[ClientSideConnection, TestClient, InitializeResponse], ) -> None: """Test that terminal runtime is not injected when --shell flag is not provided.""" connection, _client, _init_response = acp_terminal_no_shell @@ -120,7 +126,7 @@ async def test_acp_terminal_disabled_when_no_shell_flag( @pytest.mark.integration async def test_acp_terminal_disabled_when_client_unsupported( - acp_terminal_client_unsupported: tuple[object, TestClient, object], + acp_terminal_client_unsupported: tuple[ClientSideConnection, TestClient, InitializeResponse], ) -> None: """Test that terminal runtime is not used when client doesn't support terminals.""" connection, _client, _init_response = acp_terminal_client_unsupported diff --git a/tests/integration/acp/test_acp_tool_notifications.py b/tests/integration/acp/test_acp_tool_notifications.py index 4d5e0feaf..d4d28d6a6 100644 --- a/tests/integration/acp/test_acp_tool_notifications.py +++ b/tests/integration/acp/test_acp_tool_notifications.py @@ -10,11 +10,10 @@ import asyncio import sys from pathlib import Path -from typing import Any +from typing import TYPE_CHECKING, Any import pytest from acp.helpers import text_block -from acp.schema import StopReason from fast_agent.mcp.common import create_namespaced_name @@ -22,7 +21,11 @@ if str(TEST_DIR) not in sys.path: sys.path.append(str(TEST_DIR)) -from test_client import TestClient # noqa: E402 + +if TYPE_CHECKING: + from acp.client.connection import ClientSideConnection + from acp.schema import InitializeResponse, StopReason + from test_client import TestClient pytestmark = pytest.mark.asyncio(loop_scope="module") @@ -48,7 +51,7 @@ def _get_session_update_type(update: Any) -> str | None: @pytest.mark.integration async def test_acp_tool_call_notifications( - acp_tool_notifications: tuple[object, TestClient, object], + acp_tool_notifications: tuple[ClientSideConnection, TestClient, InitializeResponse], ) -> None: """Test that tool calls generate appropriate ACP notifications.""" connection, client, init_response = acp_tool_notifications @@ -108,7 +111,7 @@ async def test_acp_tool_call_notifications( @pytest.mark.integration async def test_acp_tool_progress_updates( - acp_tool_notifications: tuple[object, TestClient, object], + acp_tool_notifications: tuple[ClientSideConnection, TestClient, InitializeResponse], ) -> None: """Test that tool progress updates are sent via tool_call_update notifications.""" connection, client, _init_response = acp_tool_notifications @@ -147,7 +150,7 @@ async def test_acp_tool_progress_updates( @pytest.mark.integration async def test_acp_tool_kinds_inferred( - acp_tool_notifications: tuple[object, TestClient, object], + acp_tool_notifications: tuple[ClientSideConnection, TestClient, InitializeResponse], ) -> None: """Test that tool kinds are properly inferred from tool names.""" connection, client, _init_response = acp_tool_notifications From 71491cd9ede6fb831c64edad6185ac28d69159eb Mon Sep 17 00:00:00 2001 From: evalstate <1936278+evalstate@users.noreply.github.com> Date: Wed, 31 Dec 2025 13:43:11 +0000 Subject: [PATCH 3/7] further test optimisation --- .../api/fastagent.config.markup.yaml | 2 +- tests/integration/api/fastagent.config.yaml | 4 +- .../api/test_cli_and_mcp_server.py | 34 +++------ tests/integration/conftest.py | 75 ++++++++++++++++++- .../prompt-server/fastagent.config.yaml | 4 +- .../test_prompt_server_integration.py | 16 ++-- 6 files changed, 92 insertions(+), 43 deletions(-) diff --git a/tests/integration/api/fastagent.config.markup.yaml b/tests/integration/api/fastagent.config.markup.yaml index ec1f24238..2939591c2 100644 --- a/tests/integration/api/fastagent.config.markup.yaml +++ b/tests/integration/api/fastagent.config.markup.yaml @@ -40,7 +40,7 @@ mcp: args: ["run", "integration_agent.py", "--transport", "stdio"] sse: transport: "sse" - url: "http://localhost:8723/sse" + url: "http://localhost:${FAST_AGENT_TEST_SSE_PORT:8723}/sse" card_test: command: "uv" args: ["run", "mcp_tools_server.py"] diff --git a/tests/integration/api/fastagent.config.yaml b/tests/integration/api/fastagent.config.yaml index 88a1b22af..cd6a77ac7 100644 --- a/tests/integration/api/fastagent.config.yaml +++ b/tests/integration/api/fastagent.config.yaml @@ -42,10 +42,10 @@ mcp: args: ["run", "integration_agent.py", "--transport", "stdio"] sse: transport: "sse" - url: "http://localhost:8723/sse" + url: "http://localhost:${FAST_AGENT_TEST_SSE_PORT:8723}/sse" http: transport: "http" - url: "http://localhost:8724/mcp" + url: "http://localhost:${FAST_AGENT_TEST_HTTP_PORT:8724}/mcp" card_test: command: "uv" args: ["run", "mcp_tools_server.py"] diff --git a/tests/integration/api/test_cli_and_mcp_server.py b/tests/integration/api/test_cli_and_mcp_server.py index 19fcd06ee..f3f7968c9 100644 --- a/tests/integration/api/test_cli_and_mcp_server.py +++ b/tests/integration/api/test_cli_and_mcp_server.py @@ -1,4 +1,3 @@ -import asyncio import os import subprocess from typing import TYPE_CHECKING @@ -151,7 +150,7 @@ async def agent_function(): @pytest.mark.integration @pytest.mark.asyncio -async def test_agent_transport_option_sse(fast_agent): +async def test_agent_transport_option_sse(fast_agent, mcp_test_ports, wait_for_port): """Test that FastAgent enables server mode when --transport is provided (SSE).""" # Start the SSE server in a subprocess @@ -163,7 +162,7 @@ async def test_agent_transport_option_sse(fast_agent): test_agent_path = os.path.join(test_dir, "integration_agent.py") # Port must match what's in the fastagent.config.yaml - port = 8723 + port = mcp_test_ports["sse"] # Start the server process server_proc = subprocess.Popen( @@ -184,8 +183,7 @@ async def test_agent_transport_option_sse(fast_agent): ) try: - # Give the server a moment to start - await asyncio.sleep(2) + await wait_for_port("127.0.0.1", port, process=server_proc) # Now connect to it via the configured MCP server @fast_agent.agent(name="client", servers=["sse"]) @@ -210,7 +208,7 @@ async def agent_function(): @pytest.mark.integration @pytest.mark.asyncio -async def test_serve_request_scope_disables_session_header(): +async def test_serve_request_scope_disables_session_header(mcp_test_ports, wait_for_port): """Request-scoped instances should not advertise an MCP session id.""" import os @@ -219,7 +217,7 @@ async def test_serve_request_scope_disables_session_header(): test_dir = os.path.dirname(os.path.abspath(__file__)) config_path = os.path.join(test_dir, "fastagent.config.yaml") - port = 8731 + port = mcp_test_ports["request_http"] server_proc = subprocess.Popen( [ @@ -245,20 +243,7 @@ async def test_serve_request_scope_disables_session_header(): ) try: - # Wait until the server is listening - for _ in range(40): - if server_proc.poll() is not None: - stdout, stderr = server_proc.communicate(timeout=1) - raise AssertionError(f"Server exited early. stdout={stdout} stderr={stderr}") - try: - reader, writer = await asyncio.open_connection("127.0.0.1", port) - writer.close() - await writer.wait_closed() - break - except OSError: - await asyncio.sleep(0.25) - else: - raise AssertionError("Server did not start listening in time") + await wait_for_port("127.0.0.1", port, process=server_proc, timeout=10.0) async with httpx.AsyncClient(timeout=5.0) as client: init_payload = { @@ -293,7 +278,7 @@ async def test_serve_request_scope_disables_session_header(): @pytest.mark.integration @pytest.mark.asyncio -async def test_agent_server_option_http(fast_agent): +async def test_agent_server_option_http(fast_agent, mcp_test_ports, wait_for_port): """Test that FastAgent still accepts the legacy --server flag with HTTP transport.""" # Start the SSE server in a subprocess @@ -305,7 +290,7 @@ async def test_agent_server_option_http(fast_agent): test_agent_path = os.path.join(test_dir, "integration_agent.py") # Port must match what's in the fastagent.config.yaml - port = 8724 + port = mcp_test_ports["http"] # Start the server process server_proc = subprocess.Popen( @@ -327,8 +312,7 @@ async def test_agent_server_option_http(fast_agent): ) try: - # Give the server a moment to start - await asyncio.sleep(2) + await wait_for_port("127.0.0.1", port, process=server_proc) # Now connect to it via the configured MCP server @fast_agent.agent(name="client", servers=["http"]) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 6628583c5..3541be927 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -1,3 +1,4 @@ +import asyncio import importlib import os from pathlib import Path @@ -27,6 +28,74 @@ def cleanup_event_bus(): pass +@pytest.fixture(scope="session") +def mcp_test_ports(): + worker_id = os.getenv("PYTEST_XDIST_WORKER", "") + worker_index = 0 + if worker_id.startswith("gw"): + suffix = worker_id[2:] + if suffix.isdigit(): + worker_index = int(suffix) + + stride = int(os.getenv("FAST_AGENT_TEST_PORT_STRIDE", "10")) + offset = worker_index * stride + ports = { + "sse": 8723 + offset, + "http": 8724 + offset, + "request_http": 8731 + offset, + } + + env_updates = { + "FAST_AGENT_TEST_SSE_PORT": str(ports["sse"]), + "FAST_AGENT_TEST_HTTP_PORT": str(ports["http"]), + } + previous = {key: os.getenv(key) for key in env_updates} + os.environ.update(env_updates) + + yield ports + + for key, value in previous.items(): + if value is None: + os.environ.pop(key, None) + else: + os.environ[key] = value + + +@pytest.fixture +def wait_for_port(): + async def _wait_for_port( + host: str, + port: int, + *, + process=None, + timeout: float = 5.0, + interval: float = 0.1, + ) -> None: + deadline = asyncio.get_running_loop().time() + timeout + while True: + if process is not None and process.poll() is not None: + stdout = "" + stderr = "" + if process.stdout or process.stderr: + stdout, stderr = process.communicate(timeout=1) + raise AssertionError( + f"Server exited early. stdout={stdout!r} stderr={stderr!r}" + ) + try: + reader, writer = await asyncio.open_connection(host, port) + writer.close() + await writer.wait_closed() + return + except OSError: + if asyncio.get_running_loop().time() >= deadline: + raise AssertionError( + f"Server did not start listening on {host}:{port}" + ) + await asyncio.sleep(interval) + + return _wait_for_port + + # Set the project root directory for tests @pytest.fixture(scope="session") def project_root(): @@ -37,7 +106,7 @@ def project_root(): # Add a fixture that uses the test file's directory @pytest.fixture -def fast_agent(request): +def fast_agent(request, mcp_test_ports): """ Creates a FastAgent with config from the test file's directory. Automatically changes working directory to match the test file location. @@ -71,7 +140,7 @@ def fast_agent(request): # Add a fixture that uses the test file's directory @pytest.fixture -def markup_fast_agent(request): +def markup_fast_agent(request, mcp_test_ports): """ Creates a FastAgent with config from the test file's directory. Automatically changes working directory to match the test file location. @@ -105,7 +174,7 @@ def markup_fast_agent(request): # Add a fixture for auto_sampling disabled tests @pytest.fixture -def auto_sampling_off_fast_agent(request): +def auto_sampling_off_fast_agent(request, mcp_test_ports): """ Creates a FastAgent with auto_sampling disabled config from the test file's directory. """ diff --git a/tests/integration/prompt-server/fastagent.config.yaml b/tests/integration/prompt-server/fastagent.config.yaml index 3dbdadda4..822d29e59 100644 --- a/tests/integration/prompt-server/fastagent.config.yaml +++ b/tests/integration/prompt-server/fastagent.config.yaml @@ -28,7 +28,7 @@ mcp: ] prompt_sse: transport: "sse" - url: "http://localhost:8723/sse" + url: "http://localhost:${FAST_AGENT_TEST_SSE_PORT:8723}/sse" prompt_http: transport: "http" - url: "http://localhost:8724/mcp" + url: "http://localhost:${FAST_AGENT_TEST_HTTP_PORT:8724}/mcp" diff --git a/tests/integration/prompt-server/test_prompt_server_integration.py b/tests/integration/prompt-server/test_prompt_server_integration.py index f04798436..9eee0bfe4 100644 --- a/tests/integration/prompt-server/test_prompt_server_integration.py +++ b/tests/integration/prompt-server/test_prompt_server_integration.py @@ -175,16 +175,15 @@ async def agent_function(): @pytest.mark.integration @pytest.mark.asyncio -async def test_prompt_server_sse_can_set_ports(fast_agent): +async def test_prompt_server_sse_can_set_ports(fast_agent, mcp_test_ports, wait_for_port): # Start the SSE server in a subprocess - import asyncio import subprocess # Get the path to the test agent test_dir = Path(__file__).resolve().parent # Port must match what's in the fastagent.config.yaml - port = 8723 + port = mcp_test_ports["sse"] # Start the server process server_proc = subprocess.Popen( @@ -196,8 +195,7 @@ async def test_prompt_server_sse_can_set_ports(fast_agent): ) try: - # Give the server a moment to start - await asyncio.sleep(3) + await wait_for_port("127.0.0.1", port, process=server_proc) # Now connect to it via the configured MCP server @fast_agent.agent(name="client", servers=["prompt_sse"], model="passthrough") @@ -222,16 +220,15 @@ async def agent_function(): @pytest.mark.integration @pytest.mark.asyncio -async def test_prompt_server_http_can_set_ports(fast_agent): +async def test_prompt_server_http_can_set_ports(fast_agent, mcp_test_ports, wait_for_port): # Start the SSE server in a subprocess - import asyncio import subprocess # Get the path to the test agent test_dir = Path(__file__).resolve().parent # Port must match what's in the fastagent.config.yaml - port = 8724 + port = mcp_test_ports["http"] # Start the server process server_proc = subprocess.Popen( @@ -243,8 +240,7 @@ async def test_prompt_server_http_can_set_ports(fast_agent): ) try: - # Give the server a moment to start - await asyncio.sleep(3) + await wait_for_port("127.0.0.1", port, process=server_proc) # Now connect to it via the configured MCP server @fast_agent.agent(name="client", servers=["prompt_http"], model="passthrough") From 9b1209a7d600f3b48e0c77bd964b49da956a5404 Mon Sep 17 00:00:00 2001 From: evalstate <1936278+evalstate@users.noreply.github.com> Date: Wed, 31 Dec 2025 13:51:15 +0000 Subject: [PATCH 4/7] matrix tests for performance --- .github/workflows/checks.yml | 41 ++++++++++++++++++++++++++++++++---- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index d0e1358f0..230b7b020 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -51,7 +51,7 @@ jobs: - name: Run ty run: uv run scripts/typecheck.py - test: + unit-test: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 @@ -70,10 +70,43 @@ jobs: run: | uv sync --group dev - - name: Run pytest + - name: Run unit tests + run: uv run pytest tests/unit -v + + integration-test: + name: Integration (${{ matrix.name }}) + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + include: + - name: acp + path: tests/integration/acp + - name: api + path: tests/integration/api + - name: prompt-server + path: tests/integration/prompt-server + - name: other + path: tests/integration/async_tools tests/integration/elicitation tests/integration/history-architecture tests/integration/instruction_templates tests/integration/mcp_filtering tests/integration/mcp_ui tests/integration/prompt-state tests/integration/resources tests/integration/roots tests/integration/sampling tests/integration/sampling_with_tools tests/integration/server_instructions tests/integration/skybridge tests/integration/tool_loop tests/integration/workflow + steps: + - uses: actions/checkout@v4 + + - name: Install uv + uses: astral-sh/setup-uv@v3 + with: + enable-cache: true + + - name: "Set up Python" + uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Install the project run: | - uv run pytest tests/unit -v - uv run pytest tests/integration -v + uv sync --group dev + + - name: Run integration tests + run: uv run pytest -m integration -v ${{ matrix.path }} package-test: runs-on: ubuntu-latest From d5cebae4168753f3ec0d659f214891d1aae51708 Mon Sep 17 00:00:00 2001 From: evalstate <1936278+evalstate@users.noreply.github.com> Date: Wed, 31 Dec 2025 14:07:53 +0000 Subject: [PATCH 5/7] update matrix (cli flags) --- tests/integration/conftest.py | 55 +++++++++++++++++++++++------------ 1 file changed, 37 insertions(+), 18 deletions(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 3541be927..59b4e9e9e 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -1,6 +1,7 @@ import asyncio import importlib import os +import sys from pathlib import Path import pytest @@ -124,12 +125,18 @@ def fast_agent(request, mcp_test_ports): # Explicitly create absolute path to the config file in the test directory config_file = os.path.join(test_dir, "fastagent.config.yaml") - # Create agent with local config using absolute path - agent = FastAgent( - "Test Agent", - config_path=config_file, # Use absolute path to local config in test directory - ignore_unknown_args=True, - ) + # Avoid pytest args being parsed as FastAgent CLI args. + original_argv = sys.argv + sys.argv = [sys.argv[0]] + try: + # Create agent with local config using absolute path + agent = FastAgent( + "Test Agent", + config_path=config_file, # Use absolute path to local config in test directory + ignore_unknown_args=True, + ) + finally: + sys.argv = original_argv # Provide the agent yield agent @@ -158,12 +165,18 @@ def markup_fast_agent(request, mcp_test_ports): # Explicitly create absolute path to the config file in the test directory config_file = os.path.join(test_dir, "fastagent.config.markup.yaml") - # Create agent with local config using absolute path - agent = FastAgent( - "Test Agent", - config_path=config_file, # Use absolute path to local config in test directory - ignore_unknown_args=True, - ) + # Avoid pytest args being parsed as FastAgent CLI args. + original_argv = sys.argv + sys.argv = [sys.argv[0]] + try: + # Create agent with local config using absolute path + agent = FastAgent( + "Test Agent", + config_path=config_file, # Use absolute path to local config in test directory + ignore_unknown_args=True, + ) + finally: + sys.argv = original_argv # Provide the agent yield agent @@ -191,12 +204,18 @@ def auto_sampling_off_fast_agent(request, mcp_test_ports): # Explicitly create absolute path to the config file in the test directory config_file = os.path.join(test_dir, "fastagent.config.auto_sampling_off.yaml") - # Create agent with local config using absolute path - agent = FastAgent( - "Test Agent", - config_path=config_file, - ignore_unknown_args=True, - ) + # Avoid pytest args being parsed as FastAgent CLI args. + original_argv = sys.argv + sys.argv = [sys.argv[0]] + try: + # Create agent with local config using absolute path + agent = FastAgent( + "Test Agent", + config_path=config_file, + ignore_unknown_args=True, + ) + finally: + sys.argv = original_argv # Provide the agent yield agent From bd0dd823a073e0a7dd60a9b0afc19cd103af32c0 Mon Sep 17 00:00:00 2001 From: evalstate <1936278+evalstate@users.noreply.github.com> Date: Wed, 31 Dec 2025 14:09:43 +0000 Subject: [PATCH 6/7] run integrations --- .github/workflows/checks.yml | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 230b7b020..1359d3710 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -82,12 +82,19 @@ jobs: include: - name: acp path: tests/integration/acp + args: "" - name: api path: tests/integration/api + args: "" - name: prompt-server path: tests/integration/prompt-server + args: "" - name: other - path: tests/integration/async_tools tests/integration/elicitation tests/integration/history-architecture tests/integration/instruction_templates tests/integration/mcp_filtering tests/integration/mcp_ui tests/integration/prompt-state tests/integration/resources tests/integration/roots tests/integration/sampling tests/integration/sampling_with_tools tests/integration/server_instructions tests/integration/skybridge tests/integration/tool_loop tests/integration/workflow + path: tests/integration + args: >- + --ignore=tests/integration/acp + --ignore=tests/integration/api + --ignore=tests/integration/prompt-server steps: - uses: actions/checkout@v4 @@ -106,7 +113,7 @@ jobs: uv sync --group dev - name: Run integration tests - run: uv run pytest -m integration -v ${{ matrix.path }} + run: uv run pytest -m integration -v ${{ matrix.path }} ${{ matrix.args }} package-test: runs-on: ubuntu-latest From 90a90da906b6eb7cd1ddd00fb494be351def7c34 Mon Sep 17 00:00:00 2001 From: evalstate <1936278+evalstate@users.noreply.github.com> Date: Wed, 31 Dec 2025 14:15:47 +0000 Subject: [PATCH 7/7] test aggregate job --- .github/workflows/checks.yml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 1359d3710..c1f966314 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -115,6 +115,16 @@ jobs: - name: Run integration tests run: uv run pytest -m integration -v ${{ matrix.path }} ${{ matrix.args }} + test: + name: test + runs-on: ubuntu-latest + needs: + - unit-test + - integration-test + steps: + - name: Report combined status + run: echo "unit-test and integration-test completed" + package-test: runs-on: ubuntu-latest steps: