Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions squadron/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,40 @@ async def create_task(request: Request):
return {"success": False, "error": str(e)}


@app.post("/tasks/{task_id}/finalize")
async def finalize_task(task_id: str, request: Request):
"""
Finalize a task by merging or discarding its worktree.

Body:
{
"action": "merge" | "discard"
}
"""
try:
body = await request.json()
action = body.get("action", "merge")

from squadron.swarm.overseer import overseer
success = overseer.finalize_task(task_id, action)

if success:
return {"success": True, "task_id": task_id, "action": action}
return {"success": False, "error": "Failed to finalize task"}
except Exception as e:
return {"success": False, "error": str(e)}


@app.get("/worktrees")
async def get_worktrees():
"""Get list of active worktrees for all tasks."""
try:
from squadron.swarm.overseer import overseer
return {"worktrees": overseer.list_worktrees()}
except Exception as e:
return {"worktrees": [], "error": str(e)}



@app.get("/activity")
async def activity_stream():
Expand Down
222 changes: 222 additions & 0 deletions squadron/services/worktree.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
"""
Git Worktree Management for Squadron Agent Tasks

Each agent task gets its own isolated Git worktree to enable:
- Safe parallel execution (no conflicts between agents)
- Protected main branch (changes only via explicit merge)
- Easy discard of failed work
"""

import subprocess
import os
import logging
from typing import Optional, List, Dict
from pathlib import Path

logger = logging.getLogger('Worktree')

# Default worktree directory (relative to repo root)
WORKTREE_DIR = ".worktrees"


def get_repo_root() -> str:
"""Get the root directory of the current Git repository."""
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True,
text=True
)
if result.returncode != 0:
raise RuntimeError("Not in a Git repository")
return result.stdout.strip()


def create_worktree(task_id: str, base_branch: str = "main") -> str:
"""
Create an isolated Git worktree for a task.

Args:
task_id: Unique identifier for the task
base_branch: Branch to base the worktree on (default: main)

Returns:
Absolute path to the worktree directory

Example:
worktree_path = create_worktree("task-123")
# Creates: .worktrees/task-123/ with branch squadron/task-123
"""
repo_root = get_repo_root()
worktree_path = os.path.join(repo_root, WORKTREE_DIR, f"task-{task_id}")
branch_name = f"squadron/task-{task_id}"

# Ensure worktree directory exists
os.makedirs(os.path.dirname(worktree_path), exist_ok=True)

# Check if worktree already exists
if os.path.exists(worktree_path):
logger.warning(f"Worktree already exists: {worktree_path}")
return worktree_path

logger.info(f"Creating worktree: {worktree_path} (branch: {branch_name})")

# Create the worktree with a new branch
result = subprocess.run(
["git", "worktree", "add", worktree_path, "-b", branch_name, base_branch],
capture_output=True,
text=True,
cwd=repo_root
)

if result.returncode != 0:
raise RuntimeError(f"Failed to create worktree: {result.stderr}")

logger.info(f"✅ Worktree created: {worktree_path}")
return worktree_path


def cleanup_worktree(task_id: str, merge: bool = False, delete_branch: bool = True) -> bool:
"""
Remove a worktree after task completion.

Args:
task_id: Unique identifier for the task
merge: If True, merge the task branch into main before cleanup
delete_branch: If True, delete the branch after removing worktree

Returns:
True if cleanup was successful
"""
repo_root = get_repo_root()
worktree_path = os.path.join(repo_root, WORKTREE_DIR, f"task-{task_id}")
branch_name = f"squadron/task-{task_id}"

if not os.path.exists(worktree_path):
logger.warning(f"Worktree does not exist: {worktree_path}")
return False

# Optionally merge before cleanup
if merge:
logger.info(f"Merging branch {branch_name} into main...")

# Checkout main first
subprocess.run(["git", "checkout", "main"], cwd=repo_root, capture_output=True)

# Merge the task branch
result = subprocess.run(
["git", "merge", branch_name, "--no-ff", "-m", f"Merge task {task_id}"],
capture_output=True,
text=True,
cwd=repo_root
)

if result.returncode != 0:
logger.error(f"Merge failed: {result.stderr}")
return False

logger.info(f"✅ Merged {branch_name} into main")

# Remove the worktree
logger.info(f"Removing worktree: {worktree_path}")
result = subprocess.run(
["git", "worktree", "remove", worktree_path, "--force"],
capture_output=True,
text=True,
cwd=repo_root
)

if result.returncode != 0:
logger.error(f"Failed to remove worktree: {result.stderr}")
return False

# Optionally delete the branch
if delete_branch:
subprocess.run(
["git", "branch", "-D", branch_name],
capture_output=True,
cwd=repo_root
)
logger.info(f"✅ Deleted branch {branch_name}")

logger.info(f"✅ Worktree cleaned up: {task_id}")
return True


def get_worktree_path(task_id: str) -> Optional[str]:
"""
Get the worktree path for a task if it exists.

Args:
task_id: Unique identifier for the task

Returns:
Absolute path to worktree, or None if it doesn't exist
"""
repo_root = get_repo_root()
worktree_path = os.path.join(repo_root, WORKTREE_DIR, f"task-{task_id}")

if os.path.exists(worktree_path):
return worktree_path
return None


def list_worktrees() -> List[Dict[str, str]]:
"""
List all active worktrees.

Returns:
List of dicts with 'path', 'branch', and 'task_id' keys
"""
repo_root = get_repo_root()

result = subprocess.run(
["git", "worktree", "list", "--porcelain"],
capture_output=True,
text=True,
cwd=repo_root
)

worktrees = []
current = {}

for line in result.stdout.strip().split('\n'):
if line.startswith('worktree '):
if current:
worktrees.append(current)
current = {'path': line[9:]}
elif line.startswith('branch '):
branch = line[7:]
current['branch'] = branch
# Extract task_id from branch name
if branch.startswith('refs/heads/squadron/task-'):
current['task_id'] = branch.replace('refs/heads/squadron/task-', '')

if current:
worktrees.append(current)

# Filter to only return Squadron task worktrees
return [w for w in worktrees if w.get('task_id')]


def prune_stale_worktrees() -> int:
"""
Remove stale worktree references (worktrees that were deleted manually).

Returns:
Number of stale worktrees pruned
"""
repo_root = get_repo_root()

result = subprocess.run(
["git", "worktree", "prune", "-v"],
capture_output=True,
text=True,
cwd=repo_root
)

# Count pruned entries from verbose output
pruned = result.stdout.count('Removing')
if pruned:
logger.info(f"Pruned {pruned} stale worktree(s)")

return pruned
24 changes: 24 additions & 0 deletions squadron/swarm/agent.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@

import logging
import os
from typing import Optional
from squadron.brain import SquadronBrain
from squadron.services.model_factory import ModelFactory
from squadron.services.event_bus import emit_agent_start, emit_agent_thought, emit_agent_complete
from squadron.services.worktree import create_worktree, get_worktree_path


logger = logging.getLogger('SwarmAgent')
Expand Down Expand Up @@ -37,9 +39,26 @@ def process_task(self, task: str, context: dict = None) -> dict:
- original_request: The original user request
- previous_results: Results from prior steps
- notes: Any additional context
- task_id: If provided, creates an isolated worktree
"""
self._current_task = task

# Setup worktree isolation if task_id is provided
task_id = context.get('task_id') if context else None
worktree_path = None
original_cwd = os.getcwd()

if task_id:
try:
worktree_path = get_worktree_path(task_id)
if not worktree_path:
worktree_path = create_worktree(task_id)
os.chdir(worktree_path)
logger.info(f"🌳 [{self.name}] Working in isolated worktree: {worktree_path}")
except Exception as e:
logger.warning(f"⚠️ [{self.name}] Failed to create worktree: {e}. Continuing in main repo.")
worktree_path = None

logger.info(f"🤖 [{self.name}] Processing task: {task}")
if context:
logger.info(f" 📋 Context from: {context.get('delegated_by', 'unknown')}")
Expand Down Expand Up @@ -107,6 +126,11 @@ def _build_context_block(self, ctx):

emit_agent_complete(self.name, result["text"][:200])

# Return to original directory if we switched to worktree
if worktree_path:
os.chdir(original_cwd)
logger.info(f"🌳 [{self.name}] Returned from worktree to main repo")

self._current_task = None
self._current_thought = None
self._current_tool = None
Expand Down
37 changes: 37 additions & 0 deletions squadron/swarm/overseer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import datetime
from typing import Optional
from .agent import AgentNode
from squadron.services.worktree import cleanup_worktree, list_worktrees as get_all_worktrees

logger = logging.getLogger('Overseer')

Expand Down Expand Up @@ -258,6 +259,42 @@ def get_agent_status(self) -> list:
"""Return agent status for dashboard."""
return [agent.get_status() for agent in self.agents.values()]

def finalize_task(self, task_id: str, action: str = "merge") -> bool:
"""
Finalize a task by merging or discarding its worktree.

Args:
task_id: The task ID to finalize
action: "merge" to merge changes into main, "discard" to throw away

Returns:
True if successful, False otherwise
"""
if action not in ["merge", "discard"]:
logger.error(f"Invalid action: {action}. Must be 'merge' or 'discard'")
return False

merge = (action == "merge")
success = cleanup_worktree(task_id, merge=merge)

if success:
new_status = "done" if merge else "discarded"
self.update_task_status(task_id, new_status if new_status in ["done"] else "done")
self._log_activity("task_finalized", {
"task_id": task_id,
"action": action,
"merged": merge
})
logger.info(f"✅ Task {task_id} finalized: {action}")
else:
logger.error(f"❌ Failed to finalize task {task_id}")

return success

def list_worktrees(self) -> list:
"""Return list of active worktrees for dashboard."""
return get_all_worktrees()


def _log_activity(self, event_type: str, data: dict):
"""Log activity for dashboard streaming."""
Expand Down
Loading