diff --git a/docs/docs/using/agents/a2a.md b/docs/docs/using/agents/a2a.md
index 55a5fc7b9..c51c04102 100644
--- a/docs/docs/using/agents/a2a.md
+++ b/docs/docs/using/agents/a2a.md
@@ -206,6 +206,107 @@ Associate A2A agents with virtual servers to:
- **Audit Logging**: All agent interactions are logged
- **Network Security**: HTTPS support and SSL verification
+## Local Testing
+
+### Demo A2A Agent
+
+The repository includes a demo A2A agent with calculator and weather tools for local testing:
+
+```bash
+# Terminal 1: Start ContextForge
+make dev
+
+# Terminal 2: Start the demo agent (auto-registers with ContextForge)
+uv run python scripts/demo_a2a_agent.py
+```
+
+The demo agent supports these query formats:
+
+| Query | Example | Response |
+|-------|---------|----------|
+| Calculator | `calc: 7*8+2` | `58` |
+| Weather | `weather: Dallas` | `The weather in Dallas is sunny, 25C` |
+
+**Test via Admin UI:**
+
+1. Go to `http://localhost:8000/admin`
+2. Click the "A2A Agents" tab
+3. Find "Demo Calculator Agent" and click **Test**
+4. Enter a query like `calc: 100/4+25` in the modal
+5. Click **Run Test** to see the result
+
+**Test via API:**
+
+```bash
+# Get a token
+export TOKEN=$(python -m mcpgateway.utils.create_jwt_token \
+ --username admin@example.com --exp 60 --secret my-test-key)
+
+# Invoke the agent
+curl -X POST "http://localhost:8000/a2a/demo-calculator-agent/invoke" \
+ -H "Authorization: Bearer $TOKEN" \
+ -H "Content-Type: application/json" \
+ -d '{"query": "calc: 15*4+10"}'
+```
+
+### A2A SDK HelloWorld Sample
+
+Test with the official A2A Python SDK sample:
+
+```bash
+# Clone and run the HelloWorld agent
+git clone https://github.com/google/a2a-samples.git
+cd a2a-samples/samples/python/agents/helloworld
+uv run python __main__.py # Starts on port 9999
+
+# Register with ContextForge (in another terminal)
+export TOKEN=$(python -m mcpgateway.utils.create_jwt_token \
+ --username admin@example.com --exp 60 --secret my-test-key)
+
+curl -X POST "http://localhost:8000/a2a" \
+ -H "Authorization: Bearer $TOKEN" \
+ -H "Content-Type: application/json" \
+ -d '{
+ "agent": {
+ "name": "Hello World Agent",
+ "endpoint_url": "http://localhost:9999/",
+ "agent_type": "jsonrpc",
+ "description": "Official A2A SDK HelloWorld sample"
+ },
+ "visibility": "public"
+ }'
+```
+
+### Admin UI Query Input
+
+The Admin UI test button opens a modal where you can enter custom queries:
+
+1. **Open Modal**: Click the blue **Test** button next to any A2A agent
+2. **Enter Query**: Type your query in the textarea (e.g., `calc: 5*10+2`)
+3. **Run Test**: Click **Run Test** to send the query to the agent
+4. **View Response**: The agent's response appears in the modal
+
+This allows testing A2A agents with real user queries instead of hardcoded test messages.
+
+### Testing via MCP Tools
+
+A2A agents are automatically exposed as MCP tools. After registration, invoke them via the MCP protocol:
+
+```bash
+curl -X POST "http://localhost:8000/rpc" \
+ -H "Authorization: Bearer $TOKEN" \
+ -H "Content-Type: application/json" \
+ -d '{
+ "jsonrpc": "2.0",
+ "method": "tools/call",
+ "params": {
+ "name": "a2a_demo-calculator-agent",
+ "arguments": {"query": "calc: 99+1"}
+ },
+ "id": 1
+ }'
+```
+
## Troubleshooting
### Agent Not Responding
diff --git a/mcpgateway/admin.py b/mcpgateway/admin.py
index 4e6440735..eb853a9b3 100644
--- a/mcpgateway/admin.py
+++ b/mcpgateway/admin.py
@@ -13720,15 +13720,15 @@ async def admin_delete_a2a_agent(
@admin_router.post("/a2a/{agent_id}/test")
async def admin_test_a2a_agent(
agent_id: str,
- request: Request, # pylint: disable=unused-argument
+ request: Request,
db: Session = Depends(get_db),
- user=Depends(get_current_user_with_permissions), # pylint: disable=unused-argument
+ user=Depends(get_current_user_with_permissions),
) -> JSONResponse:
"""Test A2A agent via admin UI.
Args:
agent_id: Agent ID
- request: FastAPI request object
+ request: FastAPI request object containing optional 'query' field
db: Database session
user: Authenticated user
@@ -13746,16 +13746,25 @@ async def admin_test_a2a_agent(
# Get the agent by ID
agent = await a2a_service.get_agent(db, agent_id)
+ # Parse request body to get user-provided query
+ default_message = "Hello from MCP Gateway Admin UI test!"
+ try:
+ body = await request.json()
+ # Use 'or' to also handle empty string queries
+ user_query = (body.get("query") if body else None) or default_message
+ except Exception:
+ user_query = default_message
+
# Prepare test parameters based on agent type and endpoint
if agent.agent_type in ["generic", "jsonrpc"] or agent.endpoint_url.endswith("/"):
# JSONRPC format for agents that expect it
test_params = {
"method": "message/send",
- "params": {"message": {"messageId": f"admin-test-{int(time.time())}", "role": "user", "parts": [{"type": "text", "text": "Hello from MCP Gateway Admin UI test!"}]}},
+ "params": {"message": {"messageId": f"admin-test-{int(time.time())}", "role": "user", "parts": [{"type": "text", "text": user_query}]}},
}
else:
# Generic test format
- test_params = {"message": "Hello from MCP Gateway Admin UI test!", "test": True, "timestamp": int(time.time())}
+ test_params = {"query": user_query, "message": user_query, "test": True, "timestamp": int(time.time())}
# Invoke the agent
result = await a2a_service.invoke_agent(
diff --git a/mcpgateway/services/a2a_service.py b/mcpgateway/services/a2a_service.py
index d6ef25058..298b0ad0c 100644
--- a/mcpgateway/services/a2a_service.py
+++ b/mcpgateway/services/a2a_service.py
@@ -313,40 +313,66 @@ async def register_agent(
)
db.add(new_agent)
- db.flush() # Flush to get the agent ID without committing yet
+ # Commit agent FIRST to ensure it persists even if tool creation fails
+ # This is critical because ToolService.register_tool calls db.rollback()
+ # on error, which would undo a pending (flushed but uncommitted) agent
+ db.commit()
+ db.refresh(new_agent)
# Invalidate caches since agent count changed
- a2a_stats_cache.invalidate()
- cache = _get_registry_cache()
- await cache.invalidate_agents()
- # Also invalidate tags cache since agent tags may have changed
- # First-Party
- from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel
+ # Wrapped in try/except to ensure cache failures don't fail the request
+ # when the agent is already successfully committed
+ try:
+ a2a_stats_cache.invalidate()
+ cache = _get_registry_cache()
+ await cache.invalidate_agents()
+ # Also invalidate tags cache since agent tags may have changed
+ # First-Party
+ from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel
- await admin_stats_cache.invalidate_tags()
- # First-Party
- from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel
+ await admin_stats_cache.invalidate_tags()
+ # First-Party
+ from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel
- metrics_cache.invalidate("a2a")
+ metrics_cache.invalidate("a2a")
+ except Exception as cache_error:
+ logger.warning(f"Cache invalidation failed after agent commit: {cache_error}")
# Automatically create a tool for the A2A agent if not already present
- tool_service = ToolService()
- tool_db = await tool_service.create_tool_from_a2a_agent(
- db=db,
- agent=new_agent,
- created_by=created_by,
- created_from_ip=created_from_ip,
- created_via=created_via,
- created_user_agent=created_user_agent,
- )
-
- # Associate the tool with the agent using the relationship
- # This sets both the tool_id foreign key and the tool relationship
- new_agent.tool = tool_db
- db.commit()
- db.refresh(new_agent)
+ # Tool creation is wrapped in try/except to ensure agent registration succeeds
+ # even if tool creation fails (e.g., due to visibility or permission issues)
+ tool_db = None
+ try:
+ tool_service = ToolService()
+ tool_db = await tool_service.create_tool_from_a2a_agent(
+ db=db,
+ agent=new_agent,
+ created_by=created_by,
+ created_from_ip=created_from_ip,
+ created_via=created_via,
+ created_user_agent=created_user_agent,
+ )
- logger.info(f"Registered new A2A agent: {new_agent.name} (ID: {new_agent.id}) with tool ID: {tool_db.id}")
+ # Associate the tool with the agent using the relationship
+ # This sets both the tool_id foreign key and the tool relationship
+ new_agent.tool = tool_db
+ db.commit()
+ db.refresh(new_agent)
+ logger.info(f"Registered new A2A agent: {new_agent.name} (ID: {new_agent.id}) with tool ID: {tool_db.id}")
+ except Exception as tool_error:
+ # Log the error but don't fail agent registration
+ # Agent was already committed above, so it persists even if tool creation fails
+ logger.warning(f"Failed to create tool for A2A agent {new_agent.name}: {tool_error}")
+ structured_logger.warning(
+ f"A2A agent '{new_agent.name}' created without tool association",
+ user_id=created_by,
+ resource_type="a2a_agent",
+ resource_id=str(new_agent.id),
+ custom_fields={"error": str(tool_error), "agent_name": new_agent.name},
+ )
+ # Refresh the agent to ensure it's in a clean state after any rollback
+ db.refresh(new_agent)
+ logger.info(f"Registered new A2A agent: {new_agent.name} (ID: {new_agent.id}) without tool")
# Log A2A agent registration for lifecycle tracking
structured_logger.info(
diff --git a/mcpgateway/services/tool_service.py b/mcpgateway/services/tool_service.py
index 34a54fe30..745ecd758 100644
--- a/mcpgateway/services/tool_service.py
+++ b/mcpgateway/services/tool_service.py
@@ -3738,6 +3738,10 @@ async def create_tool_from_a2a_agent(
tags=normalized_tags,
)
+ # Default to "public" visibility if agent visibility is not set
+ # This ensures A2A tools are visible in the Global Tools Tab
+ tool_visibility = agent.visibility or "public"
+
tool_read = await self.register_tool(
db,
tool_data,
@@ -3747,7 +3751,7 @@ async def create_tool_from_a2a_agent(
created_user_agent=created_user_agent,
team_id=agent.team_id,
owner_email=agent.owner_email,
- visibility=agent.visibility,
+ visibility=tool_visibility,
)
# Return the DbTool object for relationship assignment
@@ -3919,20 +3923,21 @@ async def _call_a2a_agent(self, agent: DbA2AAgent, parameters: Dict[str, Any]):
Exception: If the call fails.
"""
logger.info(f"Calling A2A agent '{agent.name}' at {agent.endpoint_url} with arguments: {parameters}")
- # Patch: Build correct JSON-RPC params structure from flat UI input
- params = None
- # If UI sends flat fields, convert to nested message structure
- if isinstance(parameters, dict) and "query" in parameters and isinstance(parameters["query"], str):
- # Build the nested message object
- message_id = f"admin-test-{int(time.time())}"
- params = {"message": {"messageId": message_id, "role": "user", "parts": [{"type": "text", "text": parameters["query"]}]}}
- method = parameters.get("method", "message/send")
- else:
- # Already in correct format or unknown, pass through
- params = parameters.get("params", parameters)
- method = parameters.get("method", "message/send")
+ # Build request data based on agent type
if agent.agent_type in ["generic", "jsonrpc"] or agent.endpoint_url.endswith("/"):
+ # JSONRPC agents: Convert flat query to nested message structure
+ params = None
+ if isinstance(parameters, dict) and "query" in parameters and isinstance(parameters["query"], str):
+ # Build the nested message object for JSONRPC protocol
+ message_id = f"admin-test-{int(time.time())}"
+ params = {"message": {"messageId": message_id, "role": "user", "parts": [{"type": "text", "text": parameters["query"]}]}}
+ method = parameters.get("method", "message/send")
+ else:
+ # Already in correct format or unknown, pass through
+ params = parameters.get("params", parameters)
+ method = parameters.get("method", "message/send")
+
try:
request_data = {"jsonrpc": "2.0", "method": method, "params": params, "id": 1}
logger.info(f"invoke tool JSONRPC request_data prepared: {request_data}")
@@ -3940,8 +3945,11 @@ async def _call_a2a_agent(self, agent: DbA2AAgent, parameters: Dict[str, Any]):
logger.error(f"Error preparing JSONRPC request data: {e}")
raise
else:
- logger.info(f"invoke tool Using custom A2A format for A2A agent '{parameters}'")
- request_data = {"interaction_type": parameters.get("interaction_type", "query"), "parameters": params, "protocol_version": agent.protocol_version}
+ # Custom agents: Pass parameters directly without JSONRPC message conversion
+ # Custom agents expect flat fields like {"query": "...", "message": "..."}
+ params = parameters if isinstance(parameters, dict) else {}
+ logger.info(f"invoke tool Using custom A2A format for A2A agent '{params}'")
+ request_data = {"interaction_type": params.get("interaction_type", "query"), "parameters": params, "protocol_version": agent.protocol_version}
logger.info(f"invoke tool request_data prepared: {request_data}")
# Make HTTP request to the agent endpoint using shared HTTP client
# First-Party
diff --git a/mcpgateway/static/admin.js b/mcpgateway/static/admin.js
index f93e39b69..1a8f92804 100644
--- a/mcpgateway/static/admin.js
+++ b/mcpgateway/static/admin.js
@@ -750,11 +750,13 @@ function closeModal(modalId, clearId = null) {
if (modalId === "gateway-test-modal") {
cleanupGatewayTestModal();
} else if (modalId === "tool-test-modal") {
- cleanupToolTestModal(); // ADD THIS LINE
+ cleanupToolTestModal();
} else if (modalId === "prompt-test-modal") {
cleanupPromptTestModal();
} else if (modalId === "resource-test-modal") {
cleanupResourceTestModal();
+ } else if (modalId === "a2a-test-modal") {
+ cleanupA2ATestModal();
}
modal.classList.add("hidden");
@@ -19030,58 +19032,129 @@ function getCookie(name) {
window.resetImportFile = resetImportFile;
// ===================================================================
-// A2A AGENT TESTING FUNCTIONALITY
+// A2A AGENT TEST MODAL FUNCTIONALITY
// ===================================================================
+let a2aTestFormHandler = null;
+let a2aTestCloseHandler = null;
+
/**
- * Test an A2A agent by making a direct invocation call
+ * Open A2A test modal with agent details
* @param {string} agentId - ID of the agent to test
* @param {string} agentName - Name of the agent for display
* @param {string} endpointUrl - Endpoint URL of the agent
*/
async function testA2AAgent(agentId, agentName, endpointUrl) {
try {
- // Show loading state
- const testResult = document.getElementById(`test-result-${agentId}`);
- testResult.innerHTML =
- '
🔄 Testing agent...
';
- testResult.classList.remove("hidden");
+ console.log("Opening A2A test modal for:", agentName);
- // Get auth token using the robust getAuthToken function
- const token = await getAuthToken();
+ // Clean up any existing event listeners
+ cleanupA2ATestModal();
- // Debug logging
- console.log("Available cookies:", document.cookie);
- console.log(
- "Found token:",
- token ? "Yes (length: " + token.length + ")" : "No",
- );
+ // Open the modal
+ openModal("a2a-test-modal");
- // Prepare headers
- const headers = {
- "Content-Type": "application/json",
- };
+ // Set modal title and description
+ const titleElement = safeGetElement("a2a-test-modal-title");
+ const descElement = safeGetElement("a2a-test-modal-description");
+ const agentIdInput = safeGetElement("a2a-test-agent-id");
+ const queryInput = safeGetElement("a2a-test-query");
+ const resultDiv = safeGetElement("a2a-test-result");
+
+ if (titleElement) {
+ titleElement.textContent = `Test A2A Agent: ${agentName}`;
+ }
+ if (descElement) {
+ descElement.textContent = `Endpoint: ${endpointUrl}`;
+ }
+ if (agentIdInput) {
+ agentIdInput.value = agentId;
+ }
+ if (queryInput) {
+ // Reset to default value
+ queryInput.value = "Hello from MCP Gateway Admin UI test!";
+ }
+ if (resultDiv) {
+ resultDiv.classList.add("hidden");
+ }
+
+ // Set up form submission handler
+ const form = safeGetElement("a2a-test-form");
+ if (form) {
+ a2aTestFormHandler = async (e) => {
+ await handleA2ATestSubmit(e);
+ };
+ form.addEventListener("submit", a2aTestFormHandler);
+ }
+ // Set up close button handler
+ const closeButton = safeGetElement("a2a-test-close");
+ if (closeButton) {
+ a2aTestCloseHandler = () => {
+ handleA2ATestClose();
+ };
+ closeButton.addEventListener("click", a2aTestCloseHandler);
+ }
+ } catch (error) {
+ console.error("Error setting up A2A test modal:", error);
+ showErrorMessage("Failed to open A2A test modal");
+ }
+}
+
+/**
+ * Handle A2A test form submission
+ * @param {Event} e - Form submit event
+ */
+async function handleA2ATestSubmit(e) {
+ e.preventDefault();
+
+ const loading = safeGetElement("a2a-test-loading");
+ const responseDiv = safeGetElement("a2a-test-response-json");
+ const resultDiv = safeGetElement("a2a-test-result");
+ const testButton = safeGetElement("a2a-test-submit");
+
+ try {
+ // Show loading
+ if (loading) {
+ loading.classList.remove("hidden");
+ }
+ if (resultDiv) {
+ resultDiv.classList.add("hidden");
+ }
+ if (testButton) {
+ testButton.disabled = true;
+ testButton.textContent = "Testing...";
+ }
+
+ const agentId = safeGetElement("a2a-test-agent-id")?.value;
+ const query =
+ safeGetElement("a2a-test-query")?.value ||
+ "Hello from MCP Gateway Admin UI test!";
+
+ if (!agentId) {
+ throw new Error("Agent ID is missing");
+ }
+
+ // Get auth token
+ const token = await getAuthToken();
+ const headers = { "Content-Type": "application/json" };
if (token) {
headers.Authorization = `Bearer ${token}`;
} else {
// Fallback to basic auth if JWT not available
console.warn("JWT token not found, attempting basic auth fallback");
- headers.Authorization = "Basic " + btoa("admin:changeme"); // Default admin credentials
+ headers.Authorization = "Basic " + btoa("admin:changeme");
}
- // Test payload is now determined server-side based on agent configuration
- const testPayload = {};
-
- // Make test request to A2A agent via admin endpoint
+ // Send test request with user query
const response = await fetchWithTimeout(
`${window.ROOT_PATH}/admin/a2a/${agentId}/test`,
{
method: "POST",
headers,
- body: JSON.stringify(testPayload),
+ body: JSON.stringify({ query }),
},
- window.MCPGATEWAY_UI_TOOL_TEST_TIMEOUT || 60000, // Use configurable timeout
+ window.MCPGATEWAY_UI_TOOL_TEST_TIMEOUT || 60000,
);
if (!response.ok) {
@@ -19091,57 +19164,100 @@ async function testA2AAgent(agentId, agentName, endpointUrl) {
const result = await response.json();
// Display result
- let resultHtml;
- if (!result.success || result.error) {
- resultHtml = `
-
+
+
+
+
+ Test A2A Agent
+
+
+
+
+
+
+
=3.12.15",
"argparse-manpage>=4.7",
"autoflake>=2.3.1",
diff --git a/scripts/demo_a2a_agent.py b/scripts/demo_a2a_agent.py
new file mode 100755
index 000000000..625748d60
--- /dev/null
+++ b/scripts/demo_a2a_agent.py
@@ -0,0 +1,317 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""Demo A2A Agent for Issue #840 Testing.
+
+This script creates a simple A2A agent with calculator and weather tools,
+runs it on an available port, and registers it with ContextForge.
+
+Usage:
+ uv run python scripts/demo_a2a_agent.py
+
+The agent supports queries like:
+ - "calc: 5*10+2"
+ - "weather: Dallas"
+
+Press Ctrl+C to stop the server and unregister the agent.
+"""
+
+import atexit
+import os
+import random
+import signal
+import socket
+import sys
+from contextlib import closing
+
+import httpx
+import jwt
+import uvicorn
+from fastapi import FastAPI
+from pydantic import BaseModel
+
+# ============================================================================
+# A2A Agent Implementation (from Issue #840)
+# ============================================================================
+
+
+def calculator(expression: str) -> str:
+ """Evaluate a math expression safely using ast module."""
+ import ast
+ import operator
+
+ # Supported operators for safe evaluation
+ operators = {
+ ast.Add: operator.add,
+ ast.Sub: operator.sub,
+ ast.Mult: operator.mul,
+ ast.Div: operator.truediv,
+ ast.USub: operator.neg,
+ ast.UAdd: operator.pos,
+ }
+
+ def safe_eval(node):
+ """Recursively evaluate an AST node safely."""
+ if isinstance(node, ast.Constant): # Python 3.8+
+ if isinstance(node.value, (int, float)):
+ return node.value
+ raise ValueError(f"Invalid constant type: {type(node.value)}")
+ elif isinstance(node, ast.BinOp):
+ if type(node.op) not in operators:
+ raise ValueError(f"Unsupported operator: {type(node.op).__name__}")
+ left = safe_eval(node.left)
+ right = safe_eval(node.right)
+ return operators[type(node.op)](left, right)
+ elif isinstance(node, ast.UnaryOp):
+ if type(node.op) not in operators:
+ raise ValueError(f"Unsupported operator: {type(node.op).__name__}")
+ operand = safe_eval(node.operand)
+ return operators[type(node.op)](operand)
+ elif isinstance(node, ast.Expression):
+ return safe_eval(node.body)
+ else:
+ raise ValueError(f"Unsupported expression type: {type(node).__name__}")
+
+ try:
+ # Parse the expression into an AST
+ tree = ast.parse(expression, mode="eval")
+ result = safe_eval(tree)
+ return str(result)
+ except (SyntaxError, ValueError) as e:
+ return f"Error: {e}"
+ except ZeroDivisionError:
+ return "Error: Division by zero"
+ except Exception as e:
+ return f"Error: {e}"
+
+
+def weather(city: str) -> str:
+ """Mock weather lookup tool."""
+ conditions = ["sunny", "rainy", "cloudy", "stormy"]
+ temp = random.randint(10, 35)
+ return f"The weather in {city} is {random.choice(conditions)}, {temp}C"
+
+
+class SimpleAgent:
+ """Simple A2A agent that routes queries to tools."""
+
+ def __init__(self, name: str = "Agent"):
+ self.name = name
+ self.tools = {
+ "calculator": calculator,
+ "weather": weather,
+ }
+
+ def run(self, query: str) -> str:
+ """Process a query and route to appropriate tool."""
+ if "calc:" in query.lower():
+ expr = query.lower().replace("calc:", "").strip()
+ return self.tools["calculator"](expr)
+ elif "weather:" in query.lower():
+ city = query.lower().replace("weather:", "").strip()
+ return self.tools["weather"](city.title())
+ else:
+ return f"{self.name} received: {query}. Try 'calc: 5*10' or 'weather: Dallas'"
+
+
+# ============================================================================
+# FastAPI Application
+# ============================================================================
+
+app = FastAPI(title="Demo A2A Agent", description="Calculator and Weather Agent for Issue #840")
+agent = SimpleAgent("Demo-A2A-Agent")
+
+
+class Parameters(BaseModel):
+ """Parameters object containing the actual query."""
+
+ query: str = ""
+ message: str = ""
+
+
+class A2ARequest(BaseModel):
+ """Request model for A2A protocol format.
+
+ ContextForge sends custom agents requests in this format:
+ {
+ "interaction_type": "admin_test",
+ "parameters": {"query": "weather: Dallas", "message": "..."},
+ "protocol_version": "1.0"
+ }
+ """
+
+ interaction_type: str = ""
+ parameters: Parameters | None = None
+ protocol_version: str = ""
+ # Also support direct query/message for simple testing
+ query: str = ""
+ message: str = ""
+
+
+class Response(BaseModel):
+ """Response model for agent results."""
+
+ response: str
+
+
+@app.post("/run")
+def run_agent(req: A2ARequest) -> Response:
+ """Execute a query against the agent.
+
+ Supports both:
+ - A2A protocol format: {"parameters": {"query": "..."}}
+ - Simple format: {"query": "..."}
+ """
+ # Extract query from A2A protocol format (parameters.query)
+ # or fall back to direct query/message fields
+ query_text = ""
+ if req.parameters:
+ query_text = req.parameters.query or req.parameters.message
+ if not query_text:
+ query_text = req.query or req.message or "Hello"
+
+ response = agent.run(query_text)
+ return Response(response=response)
+
+
+@app.get("/health")
+def health():
+ """Health check endpoint."""
+ return {"status": "healthy", "agent": agent.name}
+
+
+# ============================================================================
+# ContextForge Registration
+# ============================================================================
+
+# Configuration from environment with fallbacks for local development
+CONTEXTFORGE_URL = os.environ.get("CONTEXTFORGE_URL", "http://localhost:8000")
+JWT_SECRET = os.environ.get("JWT_SECRET_KEY", "my-test-key") # noqa: S105 - default for demo only
+AGENT_ID = None
+
+
+def create_jwt_token(username: str = "admin@example.com") -> str:
+ """Create a JWT token for ContextForge authentication."""
+ import datetime
+
+ payload = {
+ "sub": username,
+ "email": username,
+ "iat": int(datetime.datetime.now(datetime.timezone.utc).timestamp()),
+ "exp": int((datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(hours=1)).timestamp()),
+ "iss": "mcpgateway",
+ "aud": "mcpgateway-api",
+ "teams": [],
+ }
+ return jwt.encode(payload, JWT_SECRET, algorithm="HS256")
+
+
+def register_agent(port: int) -> str | None:
+ """Register the A2A agent with ContextForge."""
+ global AGENT_ID
+
+ token = create_jwt_token()
+ headers = {
+ "Content-Type": "application/json",
+ "Authorization": f"Bearer {token}",
+ }
+
+ agent_data = {
+ "agent": {
+ "name": "Demo Calculator Agent",
+ "description": "Demo A2A Agent with calculator and weather tools (Issue #840)",
+ "endpoint_url": f"http://localhost:{port}/run",
+ "agent_type": "custom",
+ "protocol_version": "1.0",
+ "capabilities": {"tools": ["calculator", "weather"]},
+ "config": {},
+ "tags": ["demo", "calculator", "weather", "issue-840"],
+ },
+ "visibility": "public",
+ }
+
+ try:
+ with httpx.Client(timeout=10) as client:
+ response = client.post(f"{CONTEXTFORGE_URL}/a2a", headers=headers, json=agent_data)
+
+ if response.status_code == 201:
+ data = response.json()
+ AGENT_ID = data.get("id")
+ print(f"Registered A2A agent with ContextForge: {AGENT_ID}")
+ print(f" Name: {data.get('name')}")
+ print(f" Endpoint: {data.get('endpointUrl')}")
+ return AGENT_ID
+ else:
+ print(f"Failed to register agent: {response.status_code}")
+ print(f" Response: {response.text}")
+ return None
+ except Exception as e:
+ print(f"Error registering agent: {e}")
+ return None
+
+
+def unregister_agent():
+ """Unregister the A2A agent from ContextForge."""
+ if not AGENT_ID:
+ return
+
+ token = create_jwt_token()
+ headers = {"Authorization": f"Bearer {token}"}
+
+ try:
+ with httpx.Client(timeout=10) as client:
+ response = client.delete(f"{CONTEXTFORGE_URL}/a2a/{AGENT_ID}", headers=headers)
+ if response.status_code in (200, 204):
+ print(f"Unregistered A2A agent: {AGENT_ID}")
+ else:
+ print(f"Failed to unregister agent: {response.status_code}")
+ except Exception as e:
+ print(f"Error unregistering agent: {e}")
+
+
+def find_available_port(start: int = 9100, end: int = 9200) -> int:
+ """Find an available port in the given range."""
+ for port in range(start, end):
+ with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
+ if sock.connect_ex(("localhost", port)) != 0:
+ return port
+ raise RuntimeError(f"No available port found in range {start}-{end}")
+
+
+# ============================================================================
+# Main Entry Point
+# ============================================================================
+
+
+def main():
+ """Run the demo A2A agent."""
+ # Find available port
+ port = find_available_port()
+ print(f"\n{'='*60}")
+ print("Demo A2A Agent for Issue #840")
+ print(f"{'='*60}")
+ print(f"Starting agent on port {port}...")
+ print("\nSupported queries:")
+ print(" - calc: 5*10+2")
+ print(" - weather: Dallas")
+ print("\nPress Ctrl+C to stop\n")
+
+ # Register cleanup handler
+ atexit.register(unregister_agent)
+
+ def signal_handler(sig, frame):
+ print("\nShutting down...")
+ unregister_agent()
+ sys.exit(0)
+
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.signal(signal.SIGTERM, signal_handler)
+
+ # Register with ContextForge
+ register_agent(port)
+
+ # Start the server
+ uvicorn.run(app, host="0.0.0.0", port=port, log_level="info")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/tests/integration/test_a2a_sdk_integration.py b/tests/integration/test_a2a_sdk_integration.py
new file mode 100644
index 000000000..001889c6a
--- /dev/null
+++ b/tests/integration/test_a2a_sdk_integration.py
@@ -0,0 +1,657 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""Integration tests for A2A agent support using the official A2A Python SDK.
+
+This module tests Issue #840 features:
+1. User input field for A2A agent testing
+2. Tool visibility fix (defaulting to public)
+3. Transaction handling for agent registration
+
+These tests use the official a2a-sdk to create proper A2A servers and clients,
+ensuring compatibility with the A2A protocol specification.
+"""
+import socket
+from contextlib import closing
+from unittest.mock import MagicMock
+
+import httpx
+import pytest
+import pytest_asyncio
+
+# A2A SDK imports
+from a2a.server.agent_execution import AgentExecutor, RequestContext
+from a2a.server.apps import A2AFastAPIApplication
+from a2a.server.events import EventQueue
+from a2a.server.request_handlers import DefaultRequestHandler
+from a2a.server.tasks import InMemoryTaskStore
+from a2a.types import AgentCapabilities, AgentCard, AgentSkill
+from a2a.utils import new_agent_text_message
+
+# ContextForge imports
+from mcpgateway.services.a2a_service import A2AAgentService
+from mcpgateway.services.tool_service import ToolService
+
+# Mark all tests in this module as integration tests
+# These tests require --with-integration flag to run
+pytestmark = pytest.mark.integration
+
+
+# =============================================================================
+# Test A2A Agent Implementation using Official SDK
+# =============================================================================
+
+
+class CalculatorAgent:
+ """Simple calculator agent for testing."""
+
+ async def invoke(self, query: str) -> str:
+ """Process a calculator query."""
+ import ast
+ import operator
+
+ operators = {
+ ast.Add: operator.add,
+ ast.Sub: operator.sub,
+ ast.Mult: operator.mul,
+ ast.Div: operator.truediv,
+ ast.USub: operator.neg,
+ ast.UAdd: operator.pos,
+ }
+
+ def safe_eval(node):
+ if isinstance(node, ast.Constant):
+ if isinstance(node.value, (int, float)):
+ return node.value
+ raise ValueError(f"Invalid constant type: {type(node.value)}")
+ elif isinstance(node, ast.BinOp):
+ if type(node.op) not in operators:
+ raise ValueError(f"Unsupported operator: {type(node.op).__name__}")
+ left = safe_eval(node.left)
+ right = safe_eval(node.right)
+ return operators[type(node.op)](left, right)
+ elif isinstance(node, ast.UnaryOp):
+ if type(node.op) not in operators:
+ raise ValueError(f"Unsupported operator: {type(node.op).__name__}")
+ operand = safe_eval(node.operand)
+ return operators[type(node.op)](operand)
+ elif isinstance(node, ast.Expression):
+ return safe_eval(node.body)
+ else:
+ raise ValueError(f"Unsupported expression type: {type(node).__name__}")
+
+ # Extract expression from query
+ expression = query.lower().replace("calc:", "").strip() if "calc:" in query.lower() else query
+
+ try:
+ tree = ast.parse(expression, mode="eval")
+ result = safe_eval(tree)
+ return str(result)
+ except (SyntaxError, ValueError) as e:
+ return f"Error: {e}"
+ except ZeroDivisionError:
+ return "Error: Division by zero"
+ except Exception as e:
+ return f"Error: {e}"
+
+
+class CalculatorAgentExecutor(AgentExecutor):
+ """Agent executor for the calculator agent."""
+
+ def __init__(self):
+ self.agent = CalculatorAgent()
+
+ async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
+ """Execute the calculator agent."""
+ user_input = context.get_user_input()
+ result = await self.agent.invoke(user_input)
+ await event_queue.enqueue_event(new_agent_text_message(result))
+
+ async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
+ """Handle cancellation."""
+ raise Exception("cancel not supported")
+
+
+def create_calculator_agent_card(port: int) -> AgentCard:
+ """Create an agent card for the calculator agent."""
+ skill = AgentSkill(
+ id="calculator",
+ name="Calculator",
+ description="Evaluates mathematical expressions safely",
+ tags=["math", "calculator"],
+ examples=["calc: 5*10+2", "calc: 100/4"],
+ )
+
+ return AgentCard(
+ name="Test Calculator Agent",
+ description="A test A2A agent with calculator functionality",
+ url=f"http://localhost:{port}/",
+ version="1.0.0",
+ default_input_modes=["text"],
+ default_output_modes=["text"],
+ capabilities=AgentCapabilities(streaming=True),
+ skills=[skill],
+ supports_authenticated_extended_card=False,
+ )
+
+
+def find_available_port(start: int = 19000, end: int = 19100) -> int:
+ """Find an available port in the given range."""
+ for port in range(start, end):
+ with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
+ if sock.connect_ex(("localhost", port)) != 0:
+ return port
+ raise RuntimeError(f"No available port found in range {start}-{end}")
+
+
+# =============================================================================
+# Fixtures
+# =============================================================================
+
+
+@pytest.fixture
+def mock_db():
+ """Create a mock database session."""
+ db = MagicMock()
+ db.add = MagicMock()
+ db.commit = MagicMock()
+ db.flush = MagicMock()
+ db.refresh = MagicMock()
+ db.rollback = MagicMock()
+ db.execute = MagicMock()
+ db.get = MagicMock(return_value=None)
+ return db
+
+
+@pytest.fixture
+def a2a_service():
+ """Create an A2A service instance."""
+ return A2AAgentService()
+
+
+@pytest.fixture
+def tool_service():
+ """Create a tool service instance."""
+ return ToolService()
+
+
+@pytest.fixture
+def calculator_agent_card():
+ """Create a calculator agent card for testing."""
+ port = find_available_port()
+ return create_calculator_agent_card(port)
+
+
+@pytest_asyncio.fixture
+async def calculator_a2a_server():
+ """Create and run a calculator A2A server using the official SDK.
+
+ This fixture creates a proper A2A-compliant server that can be used
+ for integration testing with ContextForge.
+ """
+ port = find_available_port()
+ agent_card = create_calculator_agent_card(port)
+
+ executor = CalculatorAgentExecutor()
+ handler = DefaultRequestHandler(
+ agent_executor=executor,
+ task_store=InMemoryTaskStore(),
+ )
+
+ app_builder = A2AFastAPIApplication(
+ agent_card=agent_card,
+ http_handler=handler,
+ )
+ app = app_builder.build()
+
+ # Use httpx ASGITransport for in-memory testing
+ async with httpx.AsyncClient(transport=httpx.ASGITransport(app=app), base_url=f"http://localhost:{port}") as client:
+ yield {
+ "client": client,
+ "port": port,
+ "agent_card": agent_card,
+ "app": app,
+ }
+
+
+# =============================================================================
+# Unit Tests for A2A Agent User Query Feature
+# =============================================================================
+
+
+class TestA2AUserQueryExtraction:
+ """Tests for user query extraction from request body."""
+
+ @pytest.mark.asyncio
+ async def test_extract_user_query_from_body(self):
+ """Test that user query is correctly extracted from request body."""
+
+ # Simulate request body parsing
+ body = {"query": "calc: 7*8"}
+ user_query = body.get("query", "default")
+ assert user_query == "calc: 7*8"
+
+ @pytest.mark.asyncio
+ async def test_default_query_when_body_empty(self):
+ """Test that default query is used when body is empty."""
+ body = {}
+ default_message = "Hello from MCP Gateway Admin UI test!"
+ user_query = body.get("query", default_message) if body else default_message
+ assert user_query == default_message
+
+ @pytest.mark.asyncio
+ async def test_default_query_when_body_none(self):
+ """Test that default query is used when body is None."""
+ body = None
+ default_message = "Hello from MCP Gateway Admin UI test!"
+ user_query = body.get("query", default_message) if body else default_message
+ assert user_query == default_message
+
+
+# =============================================================================
+# Unit Tests for Tool Visibility Fix
+# =============================================================================
+
+
+class TestToolVisibilityFix:
+ """Tests for tool visibility defaulting to public."""
+
+ @pytest.mark.asyncio
+ async def test_tool_visibility_defaults_to_public_when_agent_visibility_none(self, mock_db, tool_service):
+ """Test that tool visibility defaults to 'public' when agent visibility is None."""
+ # Create a mock agent with None visibility
+ mock_agent = MagicMock()
+ mock_agent.id = "test-agent-id"
+ mock_agent.name = "Test Agent"
+ mock_agent.slug = "test-agent"
+ mock_agent.description = "Test description"
+ mock_agent.endpoint_url = "http://localhost:9000/run"
+ mock_agent.agent_type = "custom"
+ mock_agent.visibility = None # Key: visibility is None
+ mock_agent.tags = ["a2a"]
+ mock_agent.team_id = None
+ mock_agent.owner_email = None
+ mock_agent.auth_type = None
+ mock_agent.auth_value = None
+
+ # Mock execute to return None (no existing tool)
+ mock_db.execute.return_value.scalar_one_or_none.return_value = None
+
+ # The visibility should default to "public"
+ tool_visibility = mock_agent.visibility or "public"
+ assert tool_visibility == "public"
+
+ @pytest.mark.asyncio
+ async def test_tool_visibility_respects_agent_visibility_when_set(self, mock_db, tool_service):
+ """Test that tool visibility respects agent visibility when explicitly set."""
+ # Create a mock agent with explicit visibility
+ mock_agent = MagicMock()
+ mock_agent.visibility = "team" # Explicitly set
+
+ tool_visibility = mock_agent.visibility or "public"
+ assert tool_visibility == "team"
+
+ @pytest.mark.asyncio
+ async def test_tool_visibility_public_when_agent_visibility_empty_string(self, mock_db, tool_service):
+ """Test that tool visibility defaults to 'public' when agent visibility is empty string."""
+ mock_agent = MagicMock()
+ mock_agent.visibility = "" # Empty string is falsy
+
+ tool_visibility = mock_agent.visibility or "public"
+ assert tool_visibility == "public"
+
+
+# =============================================================================
+# Unit Tests for Transaction Handling
+# =============================================================================
+
+
+class TestTransactionHandling:
+ """Tests for transaction handling during A2A agent registration."""
+
+ @pytest.mark.asyncio
+ async def test_agent_committed_before_tool_creation(self, mock_db, a2a_service):
+ """Test that agent is committed before tool creation is attempted.
+
+ This ensures that if tool creation fails (and calls rollback),
+ the agent registration is not lost.
+ """
+ # Track the order of operations
+ operation_order = []
+
+ def track_add(*args):
+ operation_order.append("add")
+
+ def track_commit():
+ operation_order.append("commit")
+
+ def track_flush():
+ operation_order.append("flush")
+
+ mock_db.add.side_effect = track_add
+ mock_db.commit.side_effect = track_commit
+ mock_db.flush.side_effect = track_flush
+
+ # The expected order after the fix is:
+ # 1. add (agent)
+ # 2. commit (agent - BEFORE tool creation)
+ # NOT: add -> flush -> [tool creation] -> commit/rollback
+
+ # Verify the fix ensures commit happens before tool creation
+ # by checking the code structure
+ import inspect
+
+ source = inspect.getsource(a2a_service.register_agent)
+
+ # The fix changes "db.flush()" to "db.commit()" before tool creation
+ # Check that we have the pattern: db.add -> db.commit -> tool creation
+ assert "db.add(new_agent)" in source
+ assert "db.commit()" in source
+ # The critical fix: commit should appear before create_tool_from_a2a_agent
+ add_pos = source.find("db.add(new_agent)")
+ commit_pos = source.find("db.commit()")
+ tool_creation_pos = source.find("create_tool_from_a2a_agent")
+
+ # Ensure commit comes between add and tool creation
+ assert add_pos < commit_pos < tool_creation_pos, "Agent should be committed before tool creation"
+
+ @pytest.mark.asyncio
+ async def test_agent_survives_tool_creation_failure(self, mock_db):
+ """Test that agent registration succeeds even if tool creation fails.
+
+ After the transaction handling fix, the agent is committed BEFORE
+ tool creation, so even if ToolService.register_tool calls rollback,
+ the agent persists.
+ """
+ # This test verifies the expected behavior after the fix:
+ # 1. Agent is added and committed
+ # 2. Tool creation is attempted
+ # 3. If tool creation fails (rollback), agent still exists
+
+ # The key insight is that after commit, a rollback only affects
+ # uncommitted changes, not the already-committed agent
+
+ agent_committed = False
+
+ def track_commit():
+ nonlocal agent_committed
+ agent_committed = True
+
+ mock_db.commit.side_effect = track_commit
+
+ # Simulate the flow:
+ # 1. Add agent
+ mock_db.add(MagicMock()) # new_agent
+ # 2. Commit agent (the fix!)
+ mock_db.commit()
+ assert agent_committed, "Agent should be committed before tool creation"
+
+ # 3. Tool creation fails and calls rollback
+ mock_db.rollback()
+
+ # 4. Agent should still be committed (rollback doesn't undo committed transaction)
+ assert agent_committed, "Agent should survive tool creation failure"
+
+
+# =============================================================================
+# Integration Tests with A2A SDK Server
+# =============================================================================
+
+
+class TestA2ASDKIntegration:
+ """Integration tests using the official A2A SDK."""
+
+ @pytest.mark.asyncio
+ async def test_calculator_agent_card_endpoint(self, calculator_a2a_server):
+ """Test that the A2A agent card is served correctly."""
+ client = calculator_a2a_server["client"]
+
+ response = await client.get("/.well-known/agent.json")
+ assert response.status_code == 200
+
+ card = response.json()
+ assert card["name"] == "Test Calculator Agent"
+ assert card["capabilities"]["streaming"] is True
+ assert len(card["skills"]) == 1
+ assert card["skills"][0]["id"] == "calculator"
+
+ @pytest.mark.asyncio
+ async def test_calculator_agent_message_send(self, calculator_a2a_server):
+ """Test sending a message to the calculator agent via JSON-RPC."""
+ client = calculator_a2a_server["client"]
+
+ # Send a calculation request using JSON-RPC protocol
+ response = await client.post(
+ "/",
+ json={
+ "jsonrpc": "2.0",
+ "id": "test-1",
+ "method": "message/send",
+ "params": {
+ "message": {
+ "messageId": "msg-test-1",
+ "role": "user",
+ "parts": [{"kind": "text", "text": "calc: 7*8"}],
+ }
+ },
+ },
+ )
+
+ assert response.status_code == 200
+ data = response.json()
+ assert "result" in data or "error" not in data
+
+ @pytest.mark.asyncio
+ async def test_calculator_agent_streaming(self, calculator_a2a_server):
+ """Test streaming response from calculator agent."""
+ client = calculator_a2a_server["client"]
+
+ # Request with streaming
+ response = await client.post(
+ "/",
+ json={
+ "jsonrpc": "2.0",
+ "id": "test-2",
+ "method": "message/send",
+ "params": {
+ "message": {
+ "messageId": "msg-test-2",
+ "role": "user",
+ "parts": [{"kind": "text", "text": "calc: 100/4+25"}],
+ }
+ },
+ },
+ headers={"Accept": "text/event-stream"},
+ )
+
+ # Either streaming or non-streaming response is valid
+ assert response.status_code == 200
+
+
+class TestA2AProtocolCompliance:
+ """Tests for A2A protocol compliance."""
+
+ @pytest.mark.asyncio
+ async def test_agent_card_has_required_fields(self, calculator_a2a_server):
+ """Test that agent card has all required A2A protocol fields."""
+ client = calculator_a2a_server["client"]
+
+ response = await client.get("/.well-known/agent.json")
+ card = response.json()
+
+ # Required fields per A2A spec
+ required_fields = ["name", "description", "url", "version", "capabilities", "skills"]
+ for field in required_fields:
+ assert field in card, f"Missing required field: {field}"
+
+ @pytest.mark.asyncio
+ async def test_message_send_returns_task_or_message(self, calculator_a2a_server):
+ """Test that message/send returns either a Task or Message per A2A spec."""
+ client = calculator_a2a_server["client"]
+
+ response = await client.post(
+ "/",
+ json={
+ "jsonrpc": "2.0",
+ "id": "test-3",
+ "method": "message/send",
+ "params": {
+ "message": {
+ "messageId": "msg-test-3",
+ "role": "user",
+ "parts": [{"kind": "text", "text": "calc: 2+2"}],
+ }
+ },
+ },
+ )
+
+ assert response.status_code == 200
+ data = response.json()
+
+ # Result should be either a Task (with id, status) or Message (with messageId, role, parts)
+ if "result" in data:
+ result = data["result"]
+ is_task = "id" in result and "status" in result
+ is_message = "messageId" in result and "role" in result and "parts" in result
+ assert is_task or is_message, "Result should be either Task or Message"
+
+
+# =============================================================================
+# Tests for ContextForge Admin A2A Test Endpoint
+# =============================================================================
+
+
+class TestContextForgeA2ATestEndpoint:
+ """Tests for the ContextForge admin A2A test endpoint."""
+
+ @pytest.mark.asyncio
+ async def test_admin_test_endpoint_sends_user_query(self):
+ """Test that admin test endpoint sends user-provided query to agent."""
+ # Mock the admin endpoint behavior
+ user_query = "calc: 15*3"
+ default_message = "Hello from MCP Gateway Admin UI test!"
+
+ # Simulate request body parsing (as done in admin.py)
+ body = {"query": user_query}
+ extracted_query = body.get("query", default_message) if body else default_message
+
+ assert extracted_query == user_query
+ assert extracted_query != default_message
+
+ @pytest.mark.asyncio
+ async def test_admin_test_endpoint_uses_default_when_no_query(self):
+ """Test that admin test endpoint uses default message when no query provided."""
+ default_message = "Hello from MCP Gateway Admin UI test!"
+
+ # Empty body
+ body = {}
+ extracted_query = (body.get("query") if body else None) or default_message
+ assert extracted_query == default_message
+
+ # Body with empty query - should also use default
+ body = {"query": ""}
+ extracted_query = (body.get("query") if body else None) or default_message
+ assert extracted_query == default_message
+
+ # Body with None query
+ body = {"query": None}
+ extracted_query = (body.get("query") if body else None) or default_message
+ assert extracted_query == default_message
+
+ @pytest.mark.asyncio
+ async def test_jsonrpc_format_includes_user_query(self):
+ """Test that JSONRPC format includes user query in message parts."""
+ import time
+
+ user_query = "calc: 100/5"
+
+ # Simulate JSONRPC format construction (as done in admin.py)
+ test_params = {
+ "method": "message/send",
+ "params": {
+ "message": {
+ "messageId": f"admin-test-{int(time.time())}",
+ "role": "user",
+ "parts": [{"type": "text", "text": user_query}],
+ }
+ },
+ }
+
+ # Verify query is in the message
+ message_text = test_params["params"]["message"]["parts"][0]["text"]
+ assert message_text == user_query
+
+ @pytest.mark.asyncio
+ async def test_custom_agent_format_includes_user_query(self):
+ """Test that custom agent format includes user query in parameters."""
+ user_query = "weather: Dallas"
+
+ # Simulate custom format construction (as done in admin.py)
+ test_params = {
+ "interaction_type": "admin_test",
+ "parameters": {"query": user_query, "message": user_query},
+ "protocol_version": "1.0",
+ }
+
+ # Verify query is in parameters
+ assert test_params["parameters"]["query"] == user_query
+ assert test_params["parameters"]["message"] == user_query
+
+
+# =============================================================================
+# Calculator Agent Unit Tests
+# =============================================================================
+
+
+class TestCalculatorAgent:
+ """Unit tests for the calculator agent implementation."""
+
+ @pytest.mark.asyncio
+ async def test_basic_arithmetic(self):
+ """Test basic arithmetic operations."""
+ agent = CalculatorAgent()
+
+ assert await agent.invoke("calc: 2+2") == "4"
+ assert await agent.invoke("calc: 10-3") == "7"
+ assert await agent.invoke("calc: 5*6") == "30"
+ assert await agent.invoke("calc: 20/4") == "5.0"
+
+ @pytest.mark.asyncio
+ async def test_complex_expressions(self):
+ """Test complex mathematical expressions."""
+ agent = CalculatorAgent()
+
+ assert await agent.invoke("calc: 7*8") == "56"
+ assert await agent.invoke("calc: 100/4+25") == "50.0"
+ assert await agent.invoke("calc: (2+3)*4") == "20"
+
+ @pytest.mark.asyncio
+ async def test_negative_numbers(self):
+ """Test negative number handling."""
+ agent = CalculatorAgent()
+
+ assert await agent.invoke("calc: -5") == "-5"
+ assert await agent.invoke("calc: -5+10") == "5"
+
+ @pytest.mark.asyncio
+ async def test_division_by_zero(self):
+ """Test division by zero error handling."""
+ agent = CalculatorAgent()
+
+ result = await agent.invoke("calc: 10/0")
+ assert "Error" in result
+
+ @pytest.mark.asyncio
+ async def test_invalid_expression(self):
+ """Test invalid expression error handling."""
+ agent = CalculatorAgent()
+
+ result = await agent.invoke("calc: invalid")
+ assert "Error" in result
+
+ @pytest.mark.asyncio
+ async def test_query_without_prefix(self):
+ """Test that queries without 'calc:' prefix still work."""
+ agent = CalculatorAgent()
+
+ # Direct expression
+ assert await agent.invoke("5*5") == "25"
diff --git a/tests/unit/mcpgateway/services/test_a2a_service.py b/tests/unit/mcpgateway/services/test_a2a_service.py
index 379ed9e0c..bd1c0df45 100644
--- a/tests/unit/mcpgateway/services/test_a2a_service.py
+++ b/tests/unit/mcpgateway/services/test_a2a_service.py
@@ -145,8 +145,10 @@ async def test_register_agent_success(self, service, mock_db, sample_agent_creat
result = await service.register_agent(mock_db, sample_agent_create)
# Verify
+ # add: 1 for agent, 1 for tool
assert mock_db.add.call_count == 2
- assert mock_db.commit.call_count == 2
+ # commit: 1 for agent (before tool creation), 1 for tool, 1 for tool association
+ assert mock_db.commit.call_count == 3
assert service.convert_agent_to_read.called
async def test_register_agent_name_conflict(self, service, mock_db, sample_agent_create):
diff --git a/tests/unit/mcpgateway/test_issue_840_a2a_agent.py b/tests/unit/mcpgateway/test_issue_840_a2a_agent.py
new file mode 100644
index 000000000..dd18620d2
--- /dev/null
+++ b/tests/unit/mcpgateway/test_issue_840_a2a_agent.py
@@ -0,0 +1,743 @@
+# -*- coding: utf-8 -*-
+"""Location: ./tests/unit/mcpgateway/test_issue_840_a2a_agent.py
+Copyright 2025
+SPDX-License-Identifier: Apache-2.0
+
+Tests for GitHub Issue #840: A2A Agent testing bugs.
+
+This module contains tests that replicate the issues described in #840:
+1. A2A agent is exposed as an API endpoint but there is no way to test it
+ from the UI with user-provided input (no field to pass the user query).
+2. Tools from A2A Agent are not getting listed under the Global Tools Tab.
+
+These tests verify the expected behavior and will fail until the issues are fixed.
+"""
+
+# Standard
+from datetime import datetime, timezone
+from unittest.mock import AsyncMock, MagicMock, patch
+import uuid
+
+# Third-Party
+import pytest
+from sqlalchemy.orm import Session
+
+# First-Party
+from mcpgateway.db import A2AAgent as DbA2AAgent
+from mcpgateway.db import Tool as DbTool
+from mcpgateway.schemas import ToolRead
+from mcpgateway.services.a2a_service import A2AAgentService
+from mcpgateway.services.tool_service import ToolService
+
+
+@pytest.fixture(autouse=True)
+def mock_logging_services():
+ """Mock structured_logger and audit_trail to prevent database writes during tests."""
+ with (
+ patch("mcpgateway.services.a2a_service.structured_logger") as mock_a2a_logger,
+ patch("mcpgateway.services.tool_service.structured_logger") as mock_tool_logger,
+ patch("mcpgateway.services.tool_service.audit_trail") as mock_tool_audit,
+ ):
+ mock_a2a_logger.log = MagicMock(return_value=None)
+ mock_a2a_logger.info = MagicMock(return_value=None)
+ mock_tool_logger.log = MagicMock(return_value=None)
+ mock_tool_logger.info = MagicMock(return_value=None)
+ mock_tool_audit.log_action = MagicMock(return_value=None)
+ yield
+
+
+class TestIssue840UserInputForA2AAgentTest:
+ """Test suite for Issue #840 - Part 1: A2A agent test endpoint lacks user input field.
+
+ The issue reports that A2A agents exposed as API endpoints cannot be tested
+ from the UI with user-provided input. The test button sends a hardcoded
+ message instead of allowing users to provide custom queries like:
+ - "calc: 5*10+2"
+ - "weather: Dallas"
+
+ These tests verify that:
+ 1. The test endpoint should accept user-provided query parameters
+ 2. The test payload should include the user's custom query
+ 3. The agent invocation should receive the user-specified input
+ """
+
+ @pytest.fixture
+ def a2a_service(self):
+ """Create A2A agent service instance."""
+ return A2AAgentService()
+
+ @pytest.fixture
+ def mock_db(self):
+ """Create mock database session."""
+ return MagicMock(spec=Session)
+
+ @pytest.fixture
+ def sample_a2a_agent(self):
+ """Sample A2A agent that expects user queries."""
+ agent_id = uuid.uuid4().hex
+ return MagicMock(
+ id=agent_id,
+ name="calculator-agent",
+ slug="calculator-agent",
+ description="A2A Agent that handles calc: and weather: queries",
+ endpoint_url="http://localhost:8000/run",
+ agent_type="custom",
+ protocol_version="1.0",
+ capabilities={"chat": True, "tools": True},
+ config={},
+ auth_type="none",
+ auth_value=None,
+ enabled=True,
+ reachable=True,
+ tags=[{"id": "a2a", "label": "a2a"}, {"id": "agent", "label": "agent"}],
+ created_at=datetime.now(timezone.utc),
+ updated_at=datetime.now(timezone.utc),
+ version=1,
+ metrics=[],
+ )
+
+ @patch("mcpgateway.services.metrics_buffer_service.get_metrics_buffer_service")
+ @patch("mcpgateway.services.a2a_service.fresh_db_session")
+ @patch("mcpgateway.services.http_client_service.get_http_client")
+ async def test_invoke_agent_with_custom_user_query(
+ self,
+ mock_get_client,
+ mock_fresh_db,
+ mock_metrics_buffer_fn,
+ a2a_service,
+ mock_db,
+ sample_a2a_agent,
+ ):
+ """Test that A2A agent can be invoked with custom user query.
+
+ This test demonstrates the expected behavior: users should be able to
+ send custom queries like "calc: 7*8" to A2A agents.
+
+ Issue #840 reports that the UI test button doesn't allow users to
+ provide custom input - it sends a hardcoded message instead.
+ """
+ # Mock HTTP client
+ mock_client = AsyncMock()
+ mock_response = MagicMock()
+ mock_response.status_code = 200
+ mock_response.json.return_value = {"response": "56"} # Result of 7*8
+ mock_client.post.return_value = mock_response
+ mock_get_client.return_value = mock_client
+
+ # Mock get_agent_by_name to return our sample agent
+ a2a_service.get_agent_by_name = AsyncMock(return_value=sample_a2a_agent)
+
+ # Mock fresh_db_session
+ mock_ts_db = MagicMock()
+ mock_ts_db.execute.return_value.scalar_one_or_none.return_value = sample_a2a_agent
+ mock_fresh_db.return_value.__enter__.return_value = mock_ts_db
+ mock_fresh_db.return_value.__exit__.return_value = None
+
+ # Mock metrics buffer service
+ mock_metrics_buffer = MagicMock()
+ mock_metrics_buffer_fn.return_value = mock_metrics_buffer
+
+ # User's custom query - this is what the UI should allow
+ user_custom_query = {"query": "calc: 7*8"}
+
+ # Invoke the agent with user's custom query
+ result = await a2a_service.invoke_agent(
+ mock_db,
+ sample_a2a_agent.name,
+ user_custom_query,
+ "user_test",
+ )
+
+ # Verify the result
+ assert result["response"] == "56"
+
+ # CRITICAL: Verify that the user's custom query was sent to the agent
+ # This is the core of Issue #840 - the query should be user-provided
+ mock_client.post.assert_called_once()
+ call_args = mock_client.post.call_args
+
+ # The request body should contain the user's custom query
+ # In the current implementation, this works at the service level,
+ # but the UI doesn't provide a way to input the query
+ assert call_args is not None
+
+ async def test_admin_test_endpoint_should_accept_user_query(self):
+ """Test that verifies the admin test endpoint signature.
+
+ Issue #840 reports that the admin UI test button for A2A agents
+ sends an empty payload `{}` instead of allowing users to provide
+ custom queries.
+
+ The endpoint `/admin/a2a/{agent_id}/test` should accept a request body
+ with a user-provided query field.
+
+ Current behavior (BUG):
+ - JavaScript sends: testPayload = {}
+ - Server uses hardcoded: "Hello from MCP Gateway Admin UI test!"
+
+ Expected behavior (FIX NEEDED):
+ - JavaScript should provide input field for user query
+ - Server should use: request_body.query or request_body.message
+ """
+ # This test documents the expected behavior
+ # Currently, the admin endpoint uses hardcoded test parameters:
+ #
+ # From admin.py line 13334-13340:
+ # test_params = {
+ # "method": "message/send",
+ # "params": {"message": {..., "text": "Hello from MCP Gateway Admin UI test!"}}
+ # }
+ #
+ # The UI should instead:
+ # 1. Show an input field for the user to enter their query
+ # 2. Send the user's query to the endpoint
+ # 3. The endpoint should use that query in the test_params
+
+ # Document the expected request body format
+ expected_request_body = {
+ "query": "calc: 7*8", # User-provided query
+ }
+
+ # Document the expected test_params that should be sent to the agent
+ expected_test_params = {
+ "query": "calc: 7*8", # Should use user's query, not hardcoded message
+ }
+
+ # These assertions document the expected behavior
+ assert "query" in expected_request_body, "Request body should include user query"
+ assert expected_test_params["query"] == expected_request_body["query"], "Test params should use user's query"
+
+ @patch("mcpgateway.services.metrics_buffer_service.get_metrics_buffer_service")
+ @patch("mcpgateway.services.a2a_service.fresh_db_session")
+ @patch("mcpgateway.services.http_client_service.get_http_client")
+ async def test_custom_agent_receives_query_in_parameters(
+ self,
+ mock_get_client,
+ mock_fresh_db,
+ mock_metrics_buffer_fn,
+ a2a_service,
+ mock_db,
+ sample_a2a_agent,
+ ):
+ """Test that custom A2A agents receive query in parameters object.
+
+ Per A2A protocol, ContextForge sends custom agents requests in format:
+ {"interaction_type": "...", "parameters": {"query": "..."}, "protocol_version": "..."}
+
+ A2A-compliant agents should extract the query from parameters.query.
+
+ This test verifies that invoke_agent correctly sends the query
+ nested under the parameters object for custom agent types.
+ """
+ # Mock HTTP client to capture the actual request body
+ mock_client = AsyncMock()
+ mock_response = MagicMock()
+ mock_response.status_code = 200
+ mock_response.json.return_value = {"response": "The weather in Dallas is sunny"}
+ mock_client.post.return_value = mock_response
+ mock_get_client.return_value = mock_client
+
+ # Mock get_agent_by_name
+ a2a_service.get_agent_by_name = AsyncMock(return_value=sample_a2a_agent)
+
+ # Mock fresh_db_session
+ mock_ts_db = MagicMock()
+ mock_ts_db.execute.return_value.scalar_one_or_none.return_value = sample_a2a_agent
+ mock_fresh_db.return_value.__enter__.return_value = mock_ts_db
+ mock_fresh_db.return_value.__exit__.return_value = None
+
+ # Mock metrics buffer service
+ mock_metrics_buffer = MagicMock()
+ mock_metrics_buffer_fn.return_value = mock_metrics_buffer
+
+ # This is what the admin test endpoint sends for custom agents
+ test_params = {"query": "weather: Dallas", "message": "weather: Dallas", "test": True}
+
+ # Invoke the agent
+ await a2a_service.invoke_agent(
+ mock_db,
+ sample_a2a_agent.name,
+ test_params,
+ "admin_test",
+ )
+
+ # Verify the HTTP client was called
+ mock_client.post.assert_called_once()
+ call_args = mock_client.post.call_args
+
+ # Extract the JSON body that was sent to the agent
+ request_body = call_args.kwargs.get("json") or call_args[1].get("json")
+ assert request_body is not None, "Request body should not be None"
+
+ # Per A2A protocol for custom agents, verify the structure:
+ # {"interaction_type": "...", "parameters": {"query": "..."}, "protocol_version": "..."}
+ assert "parameters" in request_body, "Request should contain 'parameters' object"
+ assert "interaction_type" in request_body, "Request should contain 'interaction_type'"
+
+ # Verify the query is in parameters
+ params = request_body.get("parameters", {})
+ assert "query" in params or "message" in params, "parameters should contain query or message"
+
+ # Verify the actual query value
+ actual_query = params.get("query") or params.get("message")
+ assert actual_query == "weather: Dallas", f"Query should be 'weather: Dallas', got '{actual_query}'"
+
+
+class TestIssue840A2AToolsNotListedInGlobalTools:
+ """Test suite for Issue #840 - Part 2: A2A tools not appearing in Global Tools.
+
+ The issue reports that tools from A2A Agents are not getting listed under
+ the Global Tools Tab.
+
+ When an A2A agent is registered, a corresponding MCP tool should be
+ automatically created with:
+ - name: "a2a_{agent_slug}"
+ - integration_type: "A2A"
+ - tags: ["a2a", "agent", ...agent_tags]
+
+ These tests verify that:
+ 1. A2A agent registration creates a corresponding tool
+ 2. The tool has correct integration_type="A2A"
+ 3. The tool appears in tool listing queries
+ 4. The tool has proper tags for filtering
+ """
+
+ @pytest.fixture
+ def tool_service(self):
+ """Create tool service instance."""
+ return ToolService()
+
+ @pytest.fixture
+ def a2a_service(self):
+ """Create A2A agent service instance."""
+ return A2AAgentService()
+
+ @pytest.fixture
+ def mock_db(self):
+ """Create mock database session."""
+ db = MagicMock(spec=Session)
+ db.get_bind.return_value.dialect.name = "sqlite"
+ return db
+
+ @pytest.fixture
+ def sample_a2a_agent_db(self):
+ """Sample database A2A agent for tool creation."""
+ agent_id = uuid.uuid4().hex
+ agent = MagicMock(spec=DbA2AAgent)
+ agent.id = agent_id
+ agent.name = "calculator-agent"
+ agent.slug = "calculator-agent"
+ agent.description = "A2A Agent that handles calc: and weather: queries"
+ agent.endpoint_url = "http://localhost:8000/run"
+ agent.agent_type = "custom"
+ agent.protocol_version = "1.0"
+ agent.capabilities = {"chat": True, "tools": True}
+ agent.config = {}
+ agent.auth_type = "none"
+ agent.auth_value = None
+ agent.enabled = True
+ agent.reachable = True
+ agent.tags = [{"id": "test", "label": "test"}]
+ agent.team_id = None
+ agent.owner_email = "admin@example.com"
+ agent.visibility = "public"
+ return agent
+
+ async def test_a2a_agent_registration_creates_tool(
+ self,
+ tool_service,
+ mock_db,
+ sample_a2a_agent_db,
+ ):
+ """Test that registering an A2A agent creates a corresponding tool.
+
+ Issue #840 reports that A2A agent tools are not showing in the
+ Global Tools Tab. This test verifies that a tool should be created
+ when an A2A agent is registered.
+ """
+ # Mock: No existing tool with the same name
+ mock_db.execute.return_value.scalar_one_or_none.return_value = None
+
+ # Mock the register_tool method to capture the tool data
+ created_tool_data = None
+
+ async def capture_register_tool(db, tool_data, **kwargs):
+ nonlocal created_tool_data
+ created_tool_data = tool_data
+ # Return a mock ToolRead
+ return MagicMock(
+ id=uuid.uuid4().hex,
+ name=tool_data.name,
+ integration_type=tool_data.integration_type,
+ tags=tool_data.tags,
+ )
+
+ with patch.object(tool_service, "register_tool", side_effect=capture_register_tool):
+ # Mock db.get to return a tool
+ mock_tool = MagicMock(spec=DbTool)
+ mock_tool.id = uuid.uuid4().hex
+ mock_db.get.return_value = mock_tool
+
+ # Create tool from A2A agent
+ _ = await tool_service.create_tool_from_a2a_agent(
+ db=mock_db,
+ agent=sample_a2a_agent_db,
+ created_by="admin@example.com",
+ )
+
+ # Verify tool was created
+ assert created_tool_data is not None, "Tool data should be created"
+
+ # Verify tool name follows convention
+ expected_tool_name = f"a2a_{sample_a2a_agent_db.slug}"
+ assert created_tool_data.name == expected_tool_name, f"Tool name should be '{expected_tool_name}'"
+
+ # CRITICAL: Verify integration_type is "A2A"
+ # This is crucial for the tool to be recognized as an A2A tool
+ assert created_tool_data.integration_type == "A2A", "Tool integration_type must be 'A2A'"
+
+ # Verify tags include "a2a" and "agent"
+ # Tags can be either strings or dicts with 'id'/'label' keys
+ def tag_contains(tags, value):
+ """Check if a tag value exists in the tags list (handles both string and dict formats)."""
+ for tag in tags:
+ if isinstance(tag, dict):
+ if tag.get("id") == value or tag.get("label") == value:
+ return True
+ elif tag == value:
+ return True
+ return False
+
+ assert tag_contains(created_tool_data.tags, "a2a"), "Tool tags must include 'a2a'"
+ assert tag_contains(created_tool_data.tags, "agent"), "Tool tags must include 'agent'"
+
+ # Verify annotations contain agent ID
+ assert "a2a_agent_id" in created_tool_data.annotations, "Annotations must include a2a_agent_id"
+ assert created_tool_data.annotations["a2a_agent_id"] == sample_a2a_agent_db.id
+
+ async def test_a2a_tool_should_appear_in_tool_listing(self, tool_service, mock_db):
+ """Test that A2A tools appear in the global tools listing.
+
+ Issue #840 reports that A2A agent tools don't appear in the Global
+ Tools Tab. This test verifies that list_tools should include tools
+ with integration_type="A2A".
+ """
+ # Create a mock A2A tool
+ a2a_tool_id = uuid.uuid4().hex
+ a2a_tool = MagicMock(spec=DbTool)
+ a2a_tool.id = a2a_tool_id
+ a2a_tool.name = "a2a_calculator-agent"
+ a2a_tool.original_name = "a2a_calculator-agent"
+ a2a_tool.integration_type = "A2A"
+ a2a_tool.enabled = True
+ a2a_tool.tags = [{"id": "a2a", "label": "a2a"}, {"id": "agent", "label": "agent"}]
+ a2a_tool.annotations = {"a2a_agent_id": "test-agent-id", "a2a_agent_type": "custom"}
+ a2a_tool.gateway_id = None
+ a2a_tool.gateway = None
+ a2a_tool.team_id = None
+ a2a_tool.owner_email = "admin@example.com"
+ a2a_tool.visibility = "public"
+ a2a_tool.created_at = datetime.now(timezone.utc)
+ a2a_tool.updated_at = datetime.now(timezone.utc)
+
+ # Mock database query to return the A2A tool
+ mock_db.execute.return_value.scalars.return_value.all.return_value = [a2a_tool]
+
+ # Mock convert_tool_to_read to return a proper ToolRead
+ mock_tool_read = MagicMock(spec=ToolRead)
+ mock_tool_read.id = a2a_tool_id
+ mock_tool_read.name = "a2a_calculator-agent"
+ mock_tool_read.integration_type = "A2A"
+ mock_tool_read.tags = ["a2a", "agent"]
+
+ with patch.object(tool_service, "convert_tool_to_read", return_value=mock_tool_read):
+ # List tools - this should include A2A tools
+ result = await tool_service.list_tools(
+ db=mock_db,
+ include_inactive=False,
+ )
+
+ # Result is a tuple (tools_list, next_cursor)
+ tools_list = result[0] if isinstance(result, tuple) else result
+
+ # CRITICAL: Verify A2A tool appears in the listing
+ # Issue #840 reports this is not happening
+ assert len(tools_list) >= 1, "A2A tool should appear in the tools listing"
+
+ # Find the A2A tool in the results
+ a2a_tools = [t for t in tools_list if getattr(t, "integration_type", None) == "A2A"]
+ assert len(a2a_tools) >= 1, "At least one A2A tool should be in the listing"
+
+ async def test_a2a_tool_filterable_by_tags(self, tool_service, mock_db):
+ """Test that A2A tools can be filtered by 'a2a' and 'agent' tags.
+
+ The Global Tools Tab likely filters by tags. A2A tools should have
+ the 'a2a' and 'agent' tags to be discoverable.
+ """
+ # Create mock A2A tool with proper tags
+ a2a_tool = MagicMock(spec=DbTool)
+ a2a_tool.id = uuid.uuid4().hex
+ a2a_tool.name = "a2a_calculator-agent"
+ a2a_tool.original_name = "a2a_calculator-agent"
+ a2a_tool.integration_type = "A2A"
+ a2a_tool.enabled = True
+ # Tags should be in the format used by the database
+ a2a_tool.tags = [{"id": "a2a", "label": "a2a"}, {"id": "agent", "label": "agent"}, {"id": "test", "label": "test"}]
+ a2a_tool.annotations = {"a2a_agent_id": "test-agent-id"}
+ a2a_tool.gateway_id = None
+ a2a_tool.gateway = None
+ a2a_tool.team_id = None
+ a2a_tool.owner_email = "admin@example.com"
+ a2a_tool.visibility = "public"
+ a2a_tool.created_at = datetime.now(timezone.utc)
+ a2a_tool.updated_at = datetime.now(timezone.utc)
+
+ # Mock database query with tag filter
+ mock_db.execute.return_value.scalars.return_value.all.return_value = [a2a_tool]
+
+ mock_tool_read = MagicMock(spec=ToolRead)
+ mock_tool_read.id = a2a_tool.id
+ mock_tool_read.name = "a2a_calculator-agent"
+ mock_tool_read.integration_type = "A2A"
+ mock_tool_read.tags = ["a2a", "agent", "test"]
+
+ with patch.object(tool_service, "convert_tool_to_read", return_value=mock_tool_read):
+ # Filter by "a2a" tag - should find the A2A tool
+ result = await tool_service.list_tools(
+ db=mock_db,
+ tags=["a2a"],
+ include_inactive=False,
+ )
+
+ tools_list = result[0] if isinstance(result, tuple) else result
+
+ # Verify A2A tool is found when filtering by "a2a" tag
+ assert len(tools_list) >= 1, "A2A tool should be found when filtering by 'a2a' tag"
+
+ async def test_tool_integration_type_a2a_is_preserved(self, mock_db):
+ """Test that integration_type='A2A' is preserved throughout the tool lifecycle.
+
+ This test verifies that when a tool is created for an A2A agent,
+ the integration_type='A2A' is properly set and preserved.
+ """
+ # The tool should be created with integration_type="A2A"
+ # as defined in tool_service.create_tool_from_a2a_agent()
+
+ # Document the expected ToolCreate data
+ expected_tool_create_fields = {
+ "name": "a2a_calculator-agent",
+ "integration_type": "A2A", # This is the key field
+ "tags": ["test", "a2a", "agent"], # Must include a2a and agent
+ "annotations": {
+ "a2a_agent_id": "agent-uuid-here",
+ "a2a_agent_type": "custom",
+ },
+ }
+
+ # Verify the expected structure
+ assert expected_tool_create_fields["integration_type"] == "A2A"
+ assert "a2a" in expected_tool_create_fields["tags"]
+ assert "agent" in expected_tool_create_fields["tags"]
+ assert "a2a_agent_id" in expected_tool_create_fields["annotations"]
+
+
+class TestIssue840ToolInvocationRouting:
+ """Test that A2A tools are properly routed to their agents when invoked.
+
+ Even if A2A tools appear in the listing, they need to be properly invoked
+ when called. The tool_service should recognize integration_type="A2A" and
+ route the call to the A2A agent service.
+ """
+
+ @pytest.fixture
+ def tool_service(self):
+ """Create tool service instance."""
+ return ToolService()
+
+ @pytest.fixture
+ def mock_db(self):
+ """Create mock database session."""
+ return MagicMock(spec=Session)
+
+ async def test_a2a_tool_invocation_routes_to_agent_service(self, tool_service, mock_db):
+ """Test that invoking an A2A tool routes to the A2A agent service.
+
+ When a tool with integration_type="A2A" is invoked, it should:
+ 1. Extract the a2a_agent_id from annotations
+ 2. Call A2AAgentService.invoke_agent()
+ 3. Return the agent's response
+ """
+ # Create mock A2A tool
+ a2a_tool = MagicMock(spec=DbTool)
+ a2a_tool.id = uuid.uuid4().hex
+ a2a_tool.name = "a2a_calculator-agent"
+ a2a_tool.integration_type = "A2A"
+ a2a_tool.annotations = {"a2a_agent_id": "test-agent-id", "a2a_agent_type": "custom"}
+ a2a_tool.enabled = True
+
+ # The tool invocation should check integration_type and route appropriately
+ # From tool_service.py line ~2365:
+ # if tool_integration_type == "A2A" and tool_annotations and "a2a_agent_id" in tool_annotations:
+ # return await self._invoke_a2a_tool(db=db, tool=tool_stub, arguments=arguments)
+
+ # Document expected behavior
+ assert a2a_tool.integration_type == "A2A", "Tool must have integration_type='A2A'"
+ assert "a2a_agent_id" in a2a_tool.annotations, "Tool annotations must include a2a_agent_id"
+
+ # When invoked with user arguments, the query should be passed to the agent.
+ # Example expected arguments format: {"query": "calc: 7*8"}
+
+ # The _invoke_a2a_tool method should:
+ # 1. Look up the A2A agent by ID from annotations
+ # 2. Call invoke_agent with the user's arguments
+ # 3. Return the agent's response
+
+ # This documents the expected flow for proper A2A tool invocation
+ expected_invocation_flow = [
+ "1. Receive tool invocation with arguments",
+ "2. Check integration_type == 'A2A'",
+ "3. Extract a2a_agent_id from annotations",
+ "4. Call A2AAgentService.invoke_agent(agent_id, arguments)",
+ "5. Return agent response as ToolResult",
+ ]
+
+ assert len(expected_invocation_flow) == 5
+
+
+class TestIssue840CustomAgentQueryFormat:
+ """Test that custom A2A agents receive query as string, not JSONRPC message object.
+
+ When invoking A2A tools through MCP, the code must handle agent_type correctly:
+ - For JSONRPC agents: convert query to nested message structure
+ - For custom agents: pass query directly as string
+
+ Bug: Custom agents were receiving query as JSONRPC message object:
+ {"message": {"messageId": "...", "role": "user", "parts": [{"type": "text", "text": "..."}]}}
+
+ Fix: Custom agents should receive flat parameters:
+ {"query": "...", "message": "..."}
+ """
+
+ @pytest.fixture
+ def tool_service(self):
+ """Create tool service instance."""
+ return ToolService()
+
+ @pytest.fixture
+ def mock_db(self):
+ """Create mock database session."""
+ return MagicMock(spec=Session)
+
+ @pytest.fixture
+ def custom_agent(self):
+ """Create a custom (non-JSONRPC) A2A agent."""
+ agent = MagicMock(spec=DbA2AAgent)
+ agent.id = uuid.uuid4().hex
+ agent.name = "custom-calculator-agent"
+ agent.endpoint_url = "http://localhost:9100/run"
+ agent.agent_type = "custom" # NOT jsonrpc
+ agent.protocol_version = "1.0"
+ agent.auth_type = None
+ agent.auth_value = None
+ agent.enabled = True
+ return agent
+
+ @pytest.fixture
+ def jsonrpc_agent(self):
+ """Create a JSONRPC A2A agent."""
+ agent = MagicMock(spec=DbA2AAgent)
+ agent.id = uuid.uuid4().hex
+ agent.name = "jsonrpc-agent"
+ agent.endpoint_url = "http://localhost:9999/"
+ agent.agent_type = "jsonrpc"
+ agent.protocol_version = "1.0"
+ agent.auth_type = None
+ agent.auth_value = None
+ agent.enabled = True
+ return agent
+
+ @patch("mcpgateway.services.http_client_service.get_http_client")
+ async def test_custom_agent_receives_string_query(
+ self,
+ mock_get_client,
+ tool_service,
+ custom_agent,
+ ):
+ """Test that custom agents receive query as a string, not JSONRPC message object.
+
+ This is the core bug fix verification: when MCP Tool invokes a custom A2A agent,
+ the agent should receive {"parameters": {"query": "calc: 7*8"}} NOT
+ {"parameters": {"message": {"messageId": ..., "parts": [...]}}}
+ """
+ # Mock HTTP client to capture request
+ mock_client = AsyncMock()
+ mock_response = MagicMock()
+ mock_response.status_code = 200
+ mock_response.json.return_value = {"response": "56"}
+ mock_client.post.return_value = mock_response
+ mock_get_client.return_value = mock_client
+
+ # Call the agent with a flat query (how MCP tool calls it)
+ parameters = {"query": "calc: 7*8"}
+ await tool_service._call_a2a_agent(custom_agent, parameters)
+
+ # Verify HTTP call was made
+ mock_client.post.assert_called_once()
+ call_args = mock_client.post.call_args
+
+ # Extract the JSON body
+ request_body = call_args.kwargs.get("json") or call_args[1].get("json")
+ assert request_body is not None, "Request body should not be None"
+
+ # CRITICAL: For custom agents, parameters should contain flat query string
+ params = request_body.get("parameters", {})
+
+ # The query should be a STRING, not an object
+ query_value = params.get("query")
+ assert query_value is not None, "parameters.query should exist"
+ assert isinstance(query_value, str), f"parameters.query should be string, got {type(query_value)}"
+ assert query_value == "calc: 7*8", f"Query should be 'calc: 7*8', got '{query_value}'"
+
+ # Verify that 'message' is NOT a nested object (JSONRPC format)
+ message_value = params.get("message")
+ if message_value is not None:
+ assert not isinstance(message_value, dict), "parameters.message should NOT be JSONRPC object for custom agents"
+
+ @patch("mcpgateway.services.http_client_service.get_http_client")
+ async def test_jsonrpc_agent_receives_nested_message(
+ self,
+ mock_get_client,
+ tool_service,
+ jsonrpc_agent,
+ ):
+ """Test that JSONRPC agents receive query as nested message structure.
+
+ JSONRPC agents expect the A2A protocol format:
+ {"jsonrpc": "2.0", "method": "message/send", "params": {"message": {...}}}
+ """
+ # Mock HTTP client
+ mock_client = AsyncMock()
+ mock_response = MagicMock()
+ mock_response.status_code = 200
+ mock_response.json.return_value = {"result": {"text": "Hello"}}
+ mock_client.post.return_value = mock_response
+ mock_get_client.return_value = mock_client
+
+ # Call the agent with a flat query
+ parameters = {"query": "Hello world"}
+ await tool_service._call_a2a_agent(jsonrpc_agent, parameters)
+
+ # Verify HTTP call
+ mock_client.post.assert_called_once()
+ call_args = mock_client.post.call_args
+ request_body = call_args.kwargs.get("json") or call_args[1].get("json")
+
+ # For JSONRPC agents, request should be JSONRPC format
+ assert request_body.get("jsonrpc") == "2.0", "Should be JSONRPC format"
+ assert "params" in request_body, "Should have params"
+
+ # params.message should be a nested object
+ params = request_body.get("params", {})
+ message = params.get("message")
+ assert isinstance(message, dict), "params.message should be dict for JSONRPC agents"
+ assert "parts" in message, "message should have parts array"
diff --git a/uv.lock b/uv.lock
index 380a88231..b38eea9a8 100644
--- a/uv.lock
+++ b/uv.lock
@@ -10,6 +10,29 @@ resolution-markers = [
"(python_full_version < '3.12' and platform_machine != 'x86_64') or (python_full_version < '3.12' and sys_platform != 'darwin')",
]
+[[package]]
+name = "a2a-sdk"
+version = "0.3.22"
+source = { registry = "https://pypi.org/simple" }
+dependencies = [
+ { name = "google-api-core" },
+ { name = "httpx" },
+ { name = "httpx-sse" },
+ { name = "protobuf" },
+ { name = "pydantic" },
+]
+sdist = { url = "https://files.pythonhosted.org/packages/92/a3/76f2d94a32a1b0dc760432d893a09ec5ed31de5ad51b1ef0f9d199ceb260/a2a_sdk-0.3.22.tar.gz", hash = "sha256:77a5694bfc4f26679c11b70c7f1062522206d430b34bc1215cfbb1eba67b7e7d", size = 231535, upload-time = "2025-12-16T18:39:21.19Z" }
+wheels = [
+ { url = "https://files.pythonhosted.org/packages/64/e8/f4e39fd1cf0b3c4537b974637143f3ebfe1158dad7232d9eef15666a81ba/a2a_sdk-0.3.22-py3-none-any.whl", hash = "sha256:b98701135bb90b0ff85d35f31533b6b7a299bf810658c1c65f3814a6c15ea385", size = 144347, upload-time = "2025-12-16T18:39:19.218Z" },
+]
+
+[package.optional-dependencies]
+http-server = [
+ { name = "fastapi" },
+ { name = "sse-starlette" },
+ { name = "starlette" },
+]
+
[[package]]
name = "agent-lifecycle-toolkit"
version = "0.10.0"
@@ -1612,6 +1635,22 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/f9/27/a1ec008ece77000bb9c56a92fd5c844ecf13943198fe3978d27e890ece5c/geventhttpclient-2.3.7-cp313-cp313-win_amd64.whl", hash = "sha256:37ffa13c2a3b5311c92cd9355cb6ba077e74c2e5d34cd692e25b42549fa350d5", size = 48997, upload-time = "2025-12-07T19:48:22.294Z" },
]
+[[package]]
+name = "google-api-core"
+version = "2.29.0"
+source = { registry = "https://pypi.org/simple" }
+dependencies = [
+ { name = "google-auth" },
+ { name = "googleapis-common-protos" },
+ { name = "proto-plus" },
+ { name = "protobuf" },
+ { name = "requests" },
+]
+sdist = { url = "https://files.pythonhosted.org/packages/0d/10/05572d33273292bac49c2d1785925f7bc3ff2fe50e3044cf1062c1dde32e/google_api_core-2.29.0.tar.gz", hash = "sha256:84181be0f8e6b04006df75ddfe728f24489f0af57c96a529ff7cf45bc28797f7", size = 177828, upload-time = "2026-01-08T22:21:39.269Z" }
+wheels = [
+ { url = "https://files.pythonhosted.org/packages/77/b6/85c4d21067220b9a78cfb81f516f9725ea6befc1544ec9bd2c1acd97c324/google_api_core-2.29.0-py3-none-any.whl", hash = "sha256:d30bc60980daa36e314b5d5a3e5958b0200cb44ca8fa1be2b614e932b75a3ea9", size = 173906, upload-time = "2026-01-08T22:21:36.093Z" },
+]
+
[[package]]
name = "google-auth"
version = "2.45.0"
@@ -3363,6 +3402,7 @@ toolops = [
[package.dev-dependencies]
dev = [
+ { name = "a2a-sdk", extra = ["http-server"] },
{ name = "aiohttp" },
{ name = "argparse-manpage" },
{ name = "autoflake" },
@@ -3512,6 +3552,7 @@ provides-extras = ["redis", "redis-pure", "postgres", "mysql", "mariadb", "llmch
[package.metadata.requires-dev]
dev = [
+ { name = "a2a-sdk", extras = ["http-server"] },
{ name = "aiohttp", specifier = ">=3.12.15" },
{ name = "argparse-manpage", specifier = ">=4.7" },
{ name = "autoflake", specifier = ">=2.3.1" },
@@ -4817,6 +4858,18 @@ with-everything = [
{ name = "vulture" },
]
+[[package]]
+name = "proto-plus"
+version = "1.27.0"
+source = { registry = "https://pypi.org/simple" }
+dependencies = [
+ { name = "protobuf" },
+]
+sdist = { url = "https://files.pythonhosted.org/packages/01/89/9cbe2f4bba860e149108b683bc2efec21f14d5f7ed6e25562ad86acbc373/proto_plus-1.27.0.tar.gz", hash = "sha256:873af56dd0d7e91836aee871e5799e1c6f1bda86ac9a983e0bb9f0c266a568c4", size = 56158, upload-time = "2025-12-16T13:46:25.729Z" }
+wheels = [
+ { url = "https://files.pythonhosted.org/packages/cd/24/3b7a0818484df9c28172857af32c2397b6d8fcd99d9468bd4684f98ebf0a/proto_plus-1.27.0-py3-none-any.whl", hash = "sha256:1baa7f81cf0f8acb8bc1f6d085008ba4171eaf669629d1b6d1673b21ed1c0a82", size = 50205, upload-time = "2025-12-16T13:46:24.76Z" },
+]
+
[[package]]
name = "protobuf"
version = "6.33.2"