Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 88 additions & 23 deletions core/framework/mcp/agent_builder_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ class BuildSession:
"""Build session with persistence support."""

def __init__(self, name: str, session_id: str | None = None):
# Validate session_id if provided
if session_id:
_safe_path_segment(session_id)

self.id = session_id or f"build_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
self.name = name
self.goal: Goal | None = None
Expand Down Expand Up @@ -133,6 +137,40 @@ def from_dict(cls, data: dict) -> "BuildSession":
_session: BuildSession | None = None


def _safe_path_segment(value: str) -> str:
"""Validate a path parameter is a safe filesystem name.

Prevents path traversal by rejecting strings with separators or '..'.
"""
if not value:
raise ValueError("Path segment cannot be empty")
if "/" in value or "\\" in value or ".." in value:
raise ValueError(f"Invalid path segment '{value}': path separators or '..' not allowed")
return value


def _safe_export_path(agent_path: str) -> Path:
"""Validate and normalize agent_path for export/import.

Ensures the path is relative and stays within the 'exports/' directory.
"""
if not agent_path:
raise ValueError("agent_path is required")

# Remove any leading slashes to prevent absolute path escapes
clean_path = agent_path.lstrip("/").lstrip("\\")

# Resolve relative to project root / exports
exports_root = _PROJECT_ROOT / "exports"
full_path = (exports_root / clean_path).resolve()

# Check if the resolved path is still within exports_root
if not str(full_path).startswith(str(exports_root)):
raise ValueError(f"Invalid agent path '{agent_path}': must stay within 'exports/'")

return full_path


def _ensure_sessions_dir():
"""Ensure sessions directory exists."""
SESSIONS_DIR.mkdir(exist_ok=True)
Expand All @@ -142,22 +180,26 @@ def _save_session(session: BuildSession):
"""Save session to disk."""
_ensure_sessions_dir()

# Validate session ID
safe_id = _safe_path_segment(session.id)

# Update last modified
session.last_modified = datetime.now().isoformat()

# Save session file
session_file = SESSIONS_DIR / f"{session.id}.json"
session_file = SESSIONS_DIR / f"{safe_id}.json"
with atomic_write(session_file) as f:
json.dump(session.to_dict(), f, indent=2, default=str)

# Update active session pointer
with atomic_write(ACTIVE_SESSION_FILE) as f:
f.write(session.id)
f.write(safe_id)


def _load_session(session_id: str) -> BuildSession:
"""Load session from disk."""
session_file = SESSIONS_DIR / f"{session_id}.json"
safe_id = _safe_path_segment(session_id)
session_file = SESSIONS_DIR / f"{safe_id}.json"
if not session_file.exists():
raise ValueError(f"Session '{session_id}' not found")

Expand All @@ -177,6 +219,7 @@ def _load_active_session() -> BuildSession | None:
session_id = f.read().strip()

if session_id:
# session_id in the pointer file should already be safe
return _load_session(session_id)
except Exception as e:
logging.warning("Failed to load active session: %s", e)
Expand Down Expand Up @@ -268,11 +311,13 @@ def load_session_by_id(session_id: Annotated[str, "ID of the session to load"])
global _session

try:
_session = _load_session(session_id)
# Validate session_id
safe_id = _safe_path_segment(session_id)
_session = _load_session(safe_id)

# Update active session pointer
with atomic_write(ACTIVE_SESSION_FILE) as f:
f.write(session_id)
f.write(safe_id)

return json.dumps(
{
Expand All @@ -296,29 +341,32 @@ def delete_session(session_id: Annotated[str, "ID of the session to delete"]) ->
"""Delete a saved agent building session."""
global _session

session_file = SESSIONS_DIR / f"{session_id}.json"
if not session_file.exists():
return json.dumps({"success": False, "error": f"Session '{session_id}' not found"})

try:
# Validate session_id
safe_id = _safe_path_segment(session_id)

session_file = SESSIONS_DIR / f"{safe_id}.json"
if not session_file.exists():
return json.dumps({"success": False, "error": f"Session '{safe_id}' not found"})

# Remove session file
session_file.unlink()

# Clear active session if it was the deleted one
if _session and _session.id == session_id:
if _session and _session.id == safe_id:
_session = None

if ACTIVE_SESSION_FILE.exists():
with open(ACTIVE_SESSION_FILE) as f:
active_id = f.read().strip()
if active_id == session_id:
if active_id == safe_id:
ACTIVE_SESSION_FILE.unlink()

return json.dumps(
{
"success": True,
"deleted_session_id": session_id,
"message": f"Session '{session_id}' deleted successfully",
"deleted_session_id": safe_id,
"message": f"Session '{safe_id}' deleted successfully",
}
)
except Exception as e:
Expand Down Expand Up @@ -1636,9 +1684,15 @@ def export_graph() -> str:
if not validation["valid"]:
return json.dumps({"success": False, "errors": validation["errors"]})

# Validate session name for filesystem safety
try:
safe_name = _safe_path_segment(session.name)
except ValueError as e:
return json.dumps({"success": False, "error": f"Invalid session name: {e}"})

entry_node = validation["entry_node"]
terminal_nodes = validation["terminal_nodes"]

# ... (rest of function until file writing)
# Extract pause/resume configuration from validation
pause_nodes = validation.get("pause_nodes", [])
resume_entry_points = validation.get("resume_entry_points", [])
Expand Down Expand Up @@ -1724,7 +1778,7 @@ def export_graph() -> str:

# Build GraphSpec
graph_spec = {
"id": f"{session.name}-graph",
"id": f"{safe_name}-graph",
"goal_id": session.goal.id,
"version": "1.0.0",
"entry_node": entry_node,
Expand All @@ -1751,7 +1805,7 @@ def export_graph() -> str:
# Build export data
export_data = {
"agent": {
"id": session.name,
"id": safe_name,
"name": session.goal.name,
"version": "1.0.0",
"description": session.goal.description,
Expand All @@ -1775,9 +1829,12 @@ def export_graph() -> str:
export_data["goal"]["success_criteria"] = enriched_criteria

# === WRITE FILES TO DISK ===
# Create exports directory
exports_dir = Path("exports") / session.name
exports_dir.mkdir(parents=True, exist_ok=True)
# Use _safe_export_path to ensure files stay within exports/
try:
exports_dir = _safe_export_path(safe_name)
exports_dir.mkdir(parents=True, exist_ok=True)
except ValueError as e:
return json.dumps({"success": False, "error": str(e)})

# Write agent.json
agent_json_path = exports_dir / "agent.json"
Expand Down Expand Up @@ -1859,14 +1916,22 @@ def import_from_export(
"""
session = get_session()

path = Path(agent_json_path)
if not path.exists():
return json.dumps({"success": False, "error": f"File not found: {agent_json_path}"})

try:
# Use _safe_export_path to prevent reading files outside exports/
# (assuming we want to restrict imports to the exports/ directory)
# Note: if users want to import from anywhere, we'd need a different check
path = _safe_export_path(agent_json_path)

if not path.exists():
return json.dumps({"success": False, "error": f"File not found: {agent_json_path}"})

data = json.loads(path.read_text(encoding="utf-8"))
except ValueError as e:
return json.dumps({"success": False, "error": f"Invalid path: {e}"})
except json.JSONDecodeError as e:
return json.dumps({"success": False, "error": f"Invalid JSON: {e}"})
except Exception as e:
return json.dumps({"success": False, "error": f"Failed to read file: {e}"})

try:
# Parse goal (same pattern as BuildSession.from_dict lines 88-99)
Expand Down
85 changes: 85 additions & 0 deletions core/tests/test_agent_builder_path_traversal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import json
import unittest
from pathlib import Path
import tempfile
import os
import sys

# Add core to sys.path to allow imports
sys.path.insert(0, os.getcwd())

from framework.mcp.agent_builder_server import (
_safe_path_segment,
_safe_export_path,
delete_session,
SESSIONS_DIR
)

class TestAgentBuilderPathTraversal(unittest.TestCase):
"""Tests for path traversal protection in agent_builder_server."""

def test_safe_path_segment_valid(self):
"""Valid segments should be allowed."""
self.assertEqual(_safe_path_segment("my_agent"), "my_agent")
self.assertEqual(_safe_path_segment("build_123"), "build_123")
self.assertEqual(_safe_path_segment("session-abc"), "session-abc")

def test_safe_path_segment_invalid(self):
"""Invalid segments should raise ValueError."""
with self.assertRaises(ValueError):
_safe_path_segment("../etc/passwd")

with self.assertRaises(ValueError):
_safe_path_segment("sub/dir")

with self.assertRaises(ValueError):
_safe_path_segment("..")

with self.assertRaises(ValueError):
_safe_path_segment("")

def test_safe_export_path_valid(self):
"""Valid export paths should be allowed and stay within exports/."""
path = _safe_export_path("my_agent")
self.assertEqual(path.name, "my_agent")
self.assertIn("exports", str(path))

path = _safe_export_path("group/my_agent")
self.assertEqual(path.name, "my_agent")
self.assertEqual(path.parent.name, "group")
self.assertIn("exports", str(path))

def test_safe_export_path_invalid(self):
"""Invalid export paths should raise ValueError."""
# This SHOULD raise ValueError because it escapes exports/
with self.assertRaises(ValueError):
_safe_export_path("../../etc/passwd")

# This should NOT necessarily raise ValueError if it just becomes exports/etc/passwd
# unless we want to block all absolute-looking paths.
# Given current implementation, it's safe.

def test_delete_session_protection(self):
"""delete_session should block path traversal."""
# Create a dummy file outside SESSIONS_DIR
with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as tmp:
tmp_path = Path(tmp.name)
tmp_name = tmp_path.stem # Path without .json

try:
# Attempt to delete via path traversal
traversal_id = f"../{tmp_name}"

result_json = delete_session(traversal_id)
result = json.loads(result_json)

self.assertFalse(result["success"])
self.assertIn("error", result)
self.assertTrue(tmp_path.exists(), "File should NOT have been deleted!")

finally:
if tmp_path.exists():
tmp_path.unlink()

if __name__ == "__main__":
unittest.main()
1 change: 1 addition & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ Add to `.vscode/settings.json`:
3. **Use real provider keys in non-production environments** - validate configuration with low-risk inputs before production rollout
4. **Credential isolation** - Each tool validates its own credentials at runtime


## Troubleshooting

### "ModuleNotFoundError: No module named 'framework'"
Expand Down
2 changes: 1 addition & 1 deletion docs/developer-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Get API keys:
### Install Claude Code Skills

```bash
# Install building-agents and testing-agent skills
# Install building-agents and testing-agents skills
./quickstart.sh
```

Expand Down
2 changes: 1 addition & 1 deletion docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ This guide will help you set up the Aden Agent Framework and build your first ag
- **Python 3.11+** ([Download](https://www.python.org/downloads/)) - Python 3.12 or 3.13 recommended
- **pip** - Package installer for Python (comes with Python)
- **git** - Version control
- **Claude Code** ([Install](https://docs.anthropic.com/claude/docs/claude-code)) - Optional, for using building skills
- **Claude Code** ([Install](https://docs.anthropic.com/claude/docs/claude-code)) - Optional, for using builder skills

## Quick Start

Expand Down
2 changes: 1 addition & 1 deletion docs/pr-local-credential-parity.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Summary

Gives local API key credentials (Brave Search, GitHub, Exa, Stripe, etc.) the same feature set as Aden OAuth credentials: named aliases, identity metadata, status tracking, CRUD management, and full visibility in the credential tester.
Give local API key credentials (Brave Search, GitHub, Exa, Stripe, etc.) the same feature set as Aden OAuth credentials: named aliases, identity metadata, status tracking, CRUD management, and full visibility in the credential tester.

Fixes a bug where credentials configured with the existing `store_credential` MCP tool were invisible in the credential tester account picker.

Expand Down