Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions docs/docs/using/agents/a2a.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,107 @@ Associate A2A agents with virtual servers to:
- **Audit Logging**: All agent interactions are logged
- **Network Security**: HTTPS support and SSL verification

## Local Testing

### Demo A2A Agent

The repository includes a demo A2A agent with calculator and weather tools for local testing:

```bash
# Terminal 1: Start ContextForge
make dev

# Terminal 2: Start the demo agent (auto-registers with ContextForge)
uv run python scripts/demo_a2a_agent.py
```

The demo agent supports these query formats:

| Query | Example | Response |
|-------|---------|----------|
| Calculator | `calc: 7*8+2` | `58` |
| Weather | `weather: Dallas` | `The weather in Dallas is sunny, 25C` |

**Test via Admin UI:**

1. Go to `http://localhost:8000/admin`
2. Click the "A2A Agents" tab
3. Find "Demo Calculator Agent" and click **Test**
4. Enter a query like `calc: 100/4+25` in the modal
5. Click **Run Test** to see the result

**Test via API:**

```bash
# Get a token
export TOKEN=$(python -m mcpgateway.utils.create_jwt_token \
--username [email protected] --exp 60 --secret my-test-key)

# Invoke the agent
curl -X POST "http://localhost:8000/a2a/demo-calculator-agent/invoke" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{"query": "calc: 15*4+10"}'
```

### A2A SDK HelloWorld Sample

Test with the official A2A Python SDK sample:

```bash
# Clone and run the HelloWorld agent
git clone https://github.com/google/a2a-samples.git
cd a2a-samples/samples/python/agents/helloworld
uv run python __main__.py # Starts on port 9999

# Register with ContextForge (in another terminal)
export TOKEN=$(python -m mcpgateway.utils.create_jwt_token \
--username [email protected] --exp 60 --secret my-test-key)

curl -X POST "http://localhost:8000/a2a" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"agent": {
"name": "Hello World Agent",
"endpoint_url": "http://localhost:9999/",
"agent_type": "jsonrpc",
"description": "Official A2A SDK HelloWorld sample"
},
"visibility": "public"
}'
```

### Admin UI Query Input

The Admin UI test button opens a modal where you can enter custom queries:

1. **Open Modal**: Click the blue **Test** button next to any A2A agent
2. **Enter Query**: Type your query in the textarea (e.g., `calc: 5*10+2`)
3. **Run Test**: Click **Run Test** to send the query to the agent
4. **View Response**: The agent's response appears in the modal

This allows testing A2A agents with real user queries instead of hardcoded test messages.

### Testing via MCP Tools

A2A agents are automatically exposed as MCP tools. After registration, invoke them via the MCP protocol:

```bash
curl -X POST "http://localhost:8000/rpc" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "a2a_demo-calculator-agent",
"arguments": {"query": "calc: 99+1"}
},
"id": 1
}'
```

## Troubleshooting

### Agent Not Responding
Expand Down
19 changes: 14 additions & 5 deletions mcpgateway/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -13720,15 +13720,15 @@ async def admin_delete_a2a_agent(
@admin_router.post("/a2a/{agent_id}/test")
async def admin_test_a2a_agent(
agent_id: str,
request: Request, # pylint: disable=unused-argument
request: Request,
db: Session = Depends(get_db),
user=Depends(get_current_user_with_permissions), # pylint: disable=unused-argument
user=Depends(get_current_user_with_permissions),
) -> JSONResponse:
"""Test A2A agent via admin UI.

Args:
agent_id: Agent ID
request: FastAPI request object
request: FastAPI request object containing optional 'query' field
db: Database session
user: Authenticated user

Expand All @@ -13746,16 +13746,25 @@ async def admin_test_a2a_agent(
# Get the agent by ID
agent = await a2a_service.get_agent(db, agent_id)

# Parse request body to get user-provided query
default_message = "Hello from MCP Gateway Admin UI test!"
try:
body = await request.json()
# Use 'or' to also handle empty string queries
user_query = (body.get("query") if body else None) or default_message
except Exception:
user_query = default_message

# Prepare test parameters based on agent type and endpoint
if agent.agent_type in ["generic", "jsonrpc"] or agent.endpoint_url.endswith("/"):
# JSONRPC format for agents that expect it
test_params = {
"method": "message/send",
"params": {"message": {"messageId": f"admin-test-{int(time.time())}", "role": "user", "parts": [{"type": "text", "text": "Hello from MCP Gateway Admin UI test!"}]}},
"params": {"message": {"messageId": f"admin-test-{int(time.time())}", "role": "user", "parts": [{"type": "text", "text": user_query}]}},
}
else:
# Generic test format
test_params = {"message": "Hello from MCP Gateway Admin UI test!", "test": True, "timestamp": int(time.time())}
test_params = {"query": user_query, "message": user_query, "test": True, "timestamp": int(time.time())}

# Invoke the agent
result = await a2a_service.invoke_agent(
Expand Down
80 changes: 53 additions & 27 deletions mcpgateway/services/a2a_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,40 +313,66 @@ async def register_agent(
)

db.add(new_agent)
db.flush() # Flush to get the agent ID without committing yet
# Commit agent FIRST to ensure it persists even if tool creation fails
# This is critical because ToolService.register_tool calls db.rollback()
# on error, which would undo a pending (flushed but uncommitted) agent
db.commit()
db.refresh(new_agent)

# Invalidate caches since agent count changed
a2a_stats_cache.invalidate()
cache = _get_registry_cache()
await cache.invalidate_agents()
# Also invalidate tags cache since agent tags may have changed
# First-Party
from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel
# Wrapped in try/except to ensure cache failures don't fail the request
# when the agent is already successfully committed
try:
a2a_stats_cache.invalidate()
cache = _get_registry_cache()
await cache.invalidate_agents()
# Also invalidate tags cache since agent tags may have changed
# First-Party
from mcpgateway.cache.admin_stats_cache import admin_stats_cache # pylint: disable=import-outside-toplevel

await admin_stats_cache.invalidate_tags()
# First-Party
from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel
await admin_stats_cache.invalidate_tags()
# First-Party
from mcpgateway.cache.metrics_cache import metrics_cache # pylint: disable=import-outside-toplevel

metrics_cache.invalidate("a2a")
metrics_cache.invalidate("a2a")
except Exception as cache_error:
logger.warning(f"Cache invalidation failed after agent commit: {cache_error}")

# Automatically create a tool for the A2A agent if not already present
tool_service = ToolService()
tool_db = await tool_service.create_tool_from_a2a_agent(
db=db,
agent=new_agent,
created_by=created_by,
created_from_ip=created_from_ip,
created_via=created_via,
created_user_agent=created_user_agent,
)

# Associate the tool with the agent using the relationship
# This sets both the tool_id foreign key and the tool relationship
new_agent.tool = tool_db
db.commit()
db.refresh(new_agent)
# Tool creation is wrapped in try/except to ensure agent registration succeeds
# even if tool creation fails (e.g., due to visibility or permission issues)
tool_db = None
try:
tool_service = ToolService()
tool_db = await tool_service.create_tool_from_a2a_agent(
db=db,
agent=new_agent,
created_by=created_by,
created_from_ip=created_from_ip,
created_via=created_via,
created_user_agent=created_user_agent,
)

logger.info(f"Registered new A2A agent: {new_agent.name} (ID: {new_agent.id}) with tool ID: {tool_db.id}")
# Associate the tool with the agent using the relationship
# This sets both the tool_id foreign key and the tool relationship
new_agent.tool = tool_db
db.commit()
db.refresh(new_agent)
logger.info(f"Registered new A2A agent: {new_agent.name} (ID: {new_agent.id}) with tool ID: {tool_db.id}")
except Exception as tool_error:
# Log the error but don't fail agent registration
# Agent was already committed above, so it persists even if tool creation fails
logger.warning(f"Failed to create tool for A2A agent {new_agent.name}: {tool_error}")
structured_logger.warning(
f"A2A agent '{new_agent.name}' created without tool association",
user_id=created_by,
resource_type="a2a_agent",
resource_id=str(new_agent.id),
custom_fields={"error": str(tool_error), "agent_name": new_agent.name},
)
# Refresh the agent to ensure it's in a clean state after any rollback
db.refresh(new_agent)
logger.info(f"Registered new A2A agent: {new_agent.name} (ID: {new_agent.id}) without tool")

# Log A2A agent registration for lifecycle tracking
structured_logger.info(
Expand Down
38 changes: 23 additions & 15 deletions mcpgateway/services/tool_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3738,6 +3738,10 @@ async def create_tool_from_a2a_agent(
tags=normalized_tags,
)

# Default to "public" visibility if agent visibility is not set
# This ensures A2A tools are visible in the Global Tools Tab
tool_visibility = agent.visibility or "public"

tool_read = await self.register_tool(
db,
tool_data,
Expand All @@ -3747,7 +3751,7 @@ async def create_tool_from_a2a_agent(
created_user_agent=created_user_agent,
team_id=agent.team_id,
owner_email=agent.owner_email,
visibility=agent.visibility,
visibility=tool_visibility,
)

# Return the DbTool object for relationship assignment
Expand Down Expand Up @@ -3919,29 +3923,33 @@ async def _call_a2a_agent(self, agent: DbA2AAgent, parameters: Dict[str, Any]):
Exception: If the call fails.
"""
logger.info(f"Calling A2A agent '{agent.name}' at {agent.endpoint_url} with arguments: {parameters}")
# Patch: Build correct JSON-RPC params structure from flat UI input
params = None
# If UI sends flat fields, convert to nested message structure
if isinstance(parameters, dict) and "query" in parameters and isinstance(parameters["query"], str):
# Build the nested message object
message_id = f"admin-test-{int(time.time())}"
params = {"message": {"messageId": message_id, "role": "user", "parts": [{"type": "text", "text": parameters["query"]}]}}
method = parameters.get("method", "message/send")
else:
# Already in correct format or unknown, pass through
params = parameters.get("params", parameters)
method = parameters.get("method", "message/send")

# Build request data based on agent type
if agent.agent_type in ["generic", "jsonrpc"] or agent.endpoint_url.endswith("/"):
# JSONRPC agents: Convert flat query to nested message structure
params = None
if isinstance(parameters, dict) and "query" in parameters and isinstance(parameters["query"], str):
# Build the nested message object for JSONRPC protocol
message_id = f"admin-test-{int(time.time())}"
params = {"message": {"messageId": message_id, "role": "user", "parts": [{"type": "text", "text": parameters["query"]}]}}
method = parameters.get("method", "message/send")
else:
# Already in correct format or unknown, pass through
params = parameters.get("params", parameters)
method = parameters.get("method", "message/send")

try:
request_data = {"jsonrpc": "2.0", "method": method, "params": params, "id": 1}
logger.info(f"invoke tool JSONRPC request_data prepared: {request_data}")
except Exception as e:
logger.error(f"Error preparing JSONRPC request data: {e}")
raise
else:
logger.info(f"invoke tool Using custom A2A format for A2A agent '{parameters}'")
request_data = {"interaction_type": parameters.get("interaction_type", "query"), "parameters": params, "protocol_version": agent.protocol_version}
# Custom agents: Pass parameters directly without JSONRPC message conversion
# Custom agents expect flat fields like {"query": "...", "message": "..."}
params = parameters if isinstance(parameters, dict) else {}
logger.info(f"invoke tool Using custom A2A format for A2A agent '{params}'")
request_data = {"interaction_type": params.get("interaction_type", "query"), "parameters": params, "protocol_version": agent.protocol_version}
logger.info(f"invoke tool request_data prepared: {request_data}")
# Make HTTP request to the agent endpoint using shared HTTP client
# First-Party
Expand Down
Loading
Loading