diff --git a/docs/cli/acw.md b/docs/cli/acw.md index ebf9c5f..46ee5c4 100644 --- a/docs/cli/acw.md +++ b/docs/cli/acw.md @@ -13,7 +13,7 @@ acw --help ## Description -`acw` provides a consistent interface for invoking different AI CLI tools (claude, codex, opencode, cursor/agent) with file-based input/output. Optional flags allow editor-based input and stdout output while preserving the default file-based workflow. Python workflows wrap `acw` through `agentize.workflow.utils.acw` to preserve the same invocation semantics and timing logs. +`acw` provides a consistent interface for invoking different AI CLI tools (claude, codex, opencode, cursor/agent) with file-based input/output. Optional flags allow editor-based input and stdout output while preserving the default file-based workflow. Python workflows wrap `acw` through `agentize.workflow.api.acw` to preserve the same invocation semantics and timing logs. ## Arguments diff --git a/docs/cli/lol.md b/docs/cli/lol.md index 17cdb5b..f420915 100644 --- a/docs/cli/lol.md +++ b/docs/cli/lol.md @@ -15,7 +15,7 @@ lol [options] python -m agentize.cli [options] ``` -The Python entrypoint delegates to shell functions for most commands. `lol impl` runs the Python workflow implementation, and the shell `impl.sh` delegates to it. `lol` is the only public shell entrypoint; helper functions are private implementation details. Use the Python entrypoint for non-sourced environments or scripting contexts where argparse-style parsing is preferred. +The Python entrypoint delegates to shell functions for most commands. `lol impl` runs the Python workflow implementation (via `agentize.workflow.api`), and the shell `impl.sh` delegates to it. `lol` is the only public shell entrypoint; helper functions are private implementation details. Use the Python entrypoint for non-sourced environments or scripting contexts where argparse-style parsing is preferred. ## Commands diff --git a/docs/cli/planner.md b/docs/cli/planner.md index 889472c..84b9c91 100644 --- a/docs/cli/planner.md +++ b/docs/cli/planner.md @@ -64,7 +64,7 @@ Each stage uses `acw` for file-based CLI invocation. Prompts are rendered at run - Plan-guideline content (from `.claude-plugin/skills/plan-guideline/SKILL.md`, YAML frontmatter stripped) - Feature description and previous stage output -Prompt templates are rendered via `agentize.workflow.utils.prompt.render`, which replaces both `{{TOKEN}}` and `{#TOKEN#}` placeholders without requiring template format changes. +Prompt templates are rendered via `agentize.workflow.api.prompt.render`, which replaces both `{{TOKEN}}` and `{#TOKEN#}` placeholders without requiring template format changes. The consensus stage renders a dedicated prompt from `.claude-plugin/skills/external-consensus/external-review-prompt.md` with the three report outputs embedded. diff --git a/docs/feat/draft/api.md b/docs/feat/draft/api.md index 9c507bf..0a09bcc 100644 --- a/docs/feat/draft/api.md +++ b/docs/feat/draft/api.md @@ -25,6 +25,7 @@ Define a lightweight, imperative workflow API for coordinating agent sessions, w - output validation - retry with configurable attempts - consistent errors + - optional input/output path overrides for workflows that reuse fixed artifacts - `Session.run_parallel(...)` runs multiple sessions concurrently with the **same retry policy**. - `Session.stage(...)` builds a lightweight call object for `run_parallel(...)`. - `StageResult` exposes `stage`, `input_path`, `output_path`, and `process` with a `.text()` helper. @@ -72,7 +73,7 @@ python/agentize/workflow/api/ - **Constructor**: `Session(output_dir, prefix, *, runner=run_acw, input_suffix="-input.md", output_suffix="-output.md")` - **run_prompt**: - - `run_prompt(name, prompt, backend, *, tools=None, permission_mode=None, timeout=3600, extra_flags=None, retry=0, retry_delay=0) -> StageResult` + - `run_prompt(name, prompt, backend, *, tools=None, permission_mode=None, timeout=3600, extra_flags=None, retry=0, retry_delay=0, input_path=None, output_path=None) -> StageResult` - Writes input file, runs ACW, validates output, retries on failure. - **stage**: - `stage(name, prompt, backend, **opts) -> StageCall` diff --git a/python/agentize/workflow/__init__.md b/python/agentize/workflow/__init__.md index 49d2dc8..4b7ed51 100644 --- a/python/agentize/workflow/__init__.md +++ b/python/agentize/workflow/__init__.md @@ -4,7 +4,7 @@ Public interfaces for Python planner and impl workflow orchestration. ## External Interfaces -### From `utils/` +### From `api/` #### `run_acw` @@ -143,7 +143,7 @@ This module re-exports interfaces from submodules and does not define internal h | Module | Purpose | |--------|---------| -| `utils/` | Helper package for ACW invocation, GitHub operations, prompt rendering, and path resolution | +| `api/` | Public workflow API including Session DSL and helper utilities | | `planner/` | Standalone planning pipeline package (`python -m agentize.workflow.planner`) | | `planner.py` | Backward-compatible re-exports (deprecated) | | `impl/` | Issue-to-implementation workflow (Python) with file-based prompt | diff --git a/python/agentize/workflow/__init__.py b/python/agentize/workflow/__init__.py index cf6b032..e680bfb 100644 --- a/python/agentize/workflow/__init__.py +++ b/python/agentize/workflow/__init__.py @@ -10,7 +10,7 @@ from agentize.workflow.impl import ImplError, run_impl_workflow from agentize.workflow.planner import StageResult, run_planner_pipeline -from agentize.workflow.utils import ACW, run_acw +from agentize.workflow.api import ACW, run_acw __all__ = [ "ImplError", diff --git a/python/agentize/workflow/api/README.md b/python/agentize/workflow/api/README.md new file mode 100644 index 0000000..cf2e2d4 --- /dev/null +++ b/python/agentize/workflow/api/README.md @@ -0,0 +1,13 @@ +# Workflow API Package + +Public workflow API for building imperative agent pipelines. This package exposes the Session DSL alongside shared helpers for ACW invocation, prompt rendering, GitHub automation, and path resolution. + +## Organization + +- `__init__.py` - Convenience re-exports for public API symbols +- `session.py` - Session DSL for running staged workflows (single and parallel) +- `acw.py` - ACW invocation helpers with timing logs and provider validation +- `gh.py` - GitHub CLI wrappers for issue/label/PR actions +- `prompt.py` - Prompt rendering for `{#TOKEN#}` and `{{TOKEN}}` placeholders +- `path.py` - Path resolution helper relative to a module file +- Companion `.md` files document interfaces and internal helpers diff --git a/python/agentize/workflow/api/__init__.md b/python/agentize/workflow/api/__init__.md new file mode 100644 index 0000000..aee3430 --- /dev/null +++ b/python/agentize/workflow/api/__init__.md @@ -0,0 +1,112 @@ +# Module: agentize.workflow.api + +Public workflow API surface providing the Session DSL and ACW helpers. + +## External Interfaces + +### `Session` + +```python +class Session: + def __init__(...): ... + def run_prompt(...): ... + def stage(...): ... + def run_parallel(...): ... +``` + +Re-export of `agentize.workflow.api.session.Session`. + +### `StageResult` + +```python +@dataclass +class StageResult: + stage: str + input_path: Path + output_path: Path + process: subprocess.CompletedProcess +``` + +Re-export of `agentize.workflow.api.session.StageResult`. + +### `StageCall` + +```python +@dataclass +class StageCall: + stage: str + prompt: str | Callable[[Path], str] + backend: tuple[str, str] + options: dict[str, Any] +``` + +Re-export of `agentize.workflow.api.session.StageCall`. + +### `PipelineError` + +```python +class PipelineError(RuntimeError): ... +``` + +Re-export of `agentize.workflow.api.session.PipelineError`. + +### `run_acw` + +```python +def run_acw( + provider: str, + model: str, + input_file: str | Path, + output_file: str | Path, + *, + tools: str | None = None, + permission_mode: str | None = None, + extra_flags: list[str] | None = None, + timeout: int = 3600, + cwd: str | Path | None = None, + env: dict[str, str] | None = None, +) -> subprocess.CompletedProcess +``` + +Re-export of `agentize.workflow.api.acw.run_acw`. + +### `list_acw_providers` + +```python +def list_acw_providers() -> list[str] +``` + +Re-export of `agentize.workflow.api.acw.list_acw_providers`. + +### `ACW` + +```python +class ACW: + def __init__( + self, + name: str, + provider: str, + model: str, + timeout: int = 900, + *, + tools: str | None = None, + permission_mode: str | None = None, + extra_flags: list[str] | None = None, + log_writer: Callable[[str], None] | None = None, + runner: Callable[..., subprocess.CompletedProcess] | None = None, + ) -> None: ... + def run(self, input_file: str | Path, output_file: str | Path) -> subprocess.CompletedProcess: ... +``` + +Re-export of `agentize.workflow.api.acw.ACW`. + +## Internal Helpers + +This module only re-exports selected helpers and does not define its own internal +implementation. + +## Design Rationale + +- **Single entry point**: A stable import surface for the Session DSL and ACW helpers. +- **Focused exports**: Re-exports stay limited to workflow primitives without exposing + internal convenience logic from other packages. diff --git a/python/agentize/workflow/api/__init__.py b/python/agentize/workflow/api/__init__.py new file mode 100644 index 0000000..86a571f --- /dev/null +++ b/python/agentize/workflow/api/__init__.py @@ -0,0 +1,16 @@ +"""Public workflow API: Session DSL plus ACW helpers.""" + +from __future__ import annotations + +from agentize.workflow.api.acw import ACW, list_acw_providers, run_acw +from agentize.workflow.api.session import PipelineError, Session, StageCall, StageResult + +__all__ = [ + "ACW", + "list_acw_providers", + "run_acw", + "Session", + "StageCall", + "StageResult", + "PipelineError", +] diff --git a/python/agentize/workflow/utils/acw.md b/python/agentize/workflow/api/acw.md similarity index 100% rename from python/agentize/workflow/utils/acw.md rename to python/agentize/workflow/api/acw.md diff --git a/python/agentize/workflow/utils/acw.py b/python/agentize/workflow/api/acw.py similarity index 100% rename from python/agentize/workflow/utils/acw.py rename to python/agentize/workflow/api/acw.py diff --git a/python/agentize/workflow/utils/gh.md b/python/agentize/workflow/api/gh.md similarity index 100% rename from python/agentize/workflow/utils/gh.md rename to python/agentize/workflow/api/gh.md diff --git a/python/agentize/workflow/utils/gh.py b/python/agentize/workflow/api/gh.py similarity index 100% rename from python/agentize/workflow/utils/gh.py rename to python/agentize/workflow/api/gh.py diff --git a/python/agentize/workflow/utils/path.md b/python/agentize/workflow/api/path.md similarity index 100% rename from python/agentize/workflow/utils/path.md rename to python/agentize/workflow/api/path.md diff --git a/python/agentize/workflow/utils/path.py b/python/agentize/workflow/api/path.py similarity index 100% rename from python/agentize/workflow/utils/path.py rename to python/agentize/workflow/api/path.py diff --git a/python/agentize/workflow/utils/prompt.md b/python/agentize/workflow/api/prompt.md similarity index 100% rename from python/agentize/workflow/utils/prompt.md rename to python/agentize/workflow/api/prompt.md diff --git a/python/agentize/workflow/utils/prompt.py b/python/agentize/workflow/api/prompt.py similarity index 100% rename from python/agentize/workflow/utils/prompt.py rename to python/agentize/workflow/api/prompt.py diff --git a/python/agentize/workflow/api/session.md b/python/agentize/workflow/api/session.md new file mode 100644 index 0000000..34d76f3 --- /dev/null +++ b/python/agentize/workflow/api/session.md @@ -0,0 +1,138 @@ +# session.py + +Session DSL for running staged agent workflows with consistent artifact handling, retries, and parallel execution. + +## External Interface + +### `Session` + +```python +def __init__( + self, + output_dir: str | Path, + prefix: str, + *, + runner: Callable[..., subprocess.CompletedProcess] = run_acw, + input_suffix: str = "-input.md", + output_suffix: str = "-output.md", +) -> None +``` + +**Purpose**: Configure a workflow session rooted at `output_dir` with a shared artifact prefix and an injectable ACW runner. + +**Parameters**: +- `output_dir`: Directory for input/output artifacts (created if missing). +- `prefix`: Filename prefix used when input/output paths are not overridden. +- `runner`: ACW-compatible callable (defaults to `run_acw`). +- `input_suffix`: Default suffix for generated input filenames. +- `output_suffix`: Default suffix for generated output filenames. + +### `Session.run_prompt()` + +```python +def run_prompt( + self, + name: str, + prompt: str | Callable[[Path], str], + backend: tuple[str, str], + *, + tools: str | None = None, + permission_mode: str | None = None, + timeout: int = 3600, + extra_flags: list[str] | None = None, + retry: int = 0, + retry_delay: float = 0, + input_path: str | Path | None = None, + output_path: str | Path | None = None, +) -> StageResult +``` + +Runs a single stage with retries and output validation. + +**Behavior**: +- Resolves input/output paths from `prefix` + suffixes unless overrides are provided. +- Writes the prompt to the input path (string content or a writer callable). +- Executes the runner with stage-level tools and permission mode. +- Validates output (non-zero exit, missing output, or empty output triggers retry). +- Retries up to `1 + retry` attempts; raises `PipelineError` on failure. + +### `Session.stage()` + +```python +def stage( + self, + name: str, + prompt: str | Callable[[Path], str], + backend: tuple[str, str], + **opts: Any, +) -> StageCall +``` + +Creates a lightweight stage call object for `run_parallel()`. + +### `Session.run_parallel()` + +```python +def run_parallel( + self, + calls: Iterable[StageCall], + *, + max_workers: int = 2, + retry: int = 0, + retry_delay: float = 0, +) -> dict[str, StageResult] +``` + +Runs multiple stages concurrently with a shared retry policy and returns results keyed by stage name. + +### `StageResult` + +```python +@dataclass +class StageResult: + stage: str + input_path: Path + output_path: Path + process: subprocess.CompletedProcess + + def text(self) -> str: ... +``` + +Represents a successful stage execution. `.text()` reads the output file as a string. + +### `StageCall` + +```python +@dataclass +class StageCall: + stage: str + prompt: str | Callable[[Path], str] + backend: tuple[str, str] + options: dict[str, Any] +``` + +Captures the inputs for a stage scheduled via `run_parallel()`. + +### `PipelineError` + +```python +class PipelineError(RuntimeError): + stage: str + attempts: int + last_error: Exception | str +``` + +Raised after retry exhaustion, carrying stage metadata and the last failure detail. + +## Internal Helpers + +- `_resolve_paths()`: Applies default suffixes and normalizes path overrides. +- `_write_prompt()`: Writes prompt content to the input artifact path. +- `_run_with_retries()`: Encapsulates retry loop and validation checks. +- `_validate_output()`: Ensures successful exit code and non-empty output. + +## Design Rationale + +- **Consistent artifacts**: Centralized path resolution ensures predictable filenames and keeps workflows focused on orchestration logic. +- **Shared validation**: Output checks and retries live in one place to avoid duplicated error handling across pipelines. +- **Minimal concurrency**: A small `run_parallel()` wrapper covers the common fan-out use case without adding heavy orchestration layers. diff --git a/python/agentize/workflow/api/session.py b/python/agentize/workflow/api/session.py new file mode 100644 index 0000000..0d0f99a --- /dev/null +++ b/python/agentize/workflow/api/session.py @@ -0,0 +1,246 @@ +"""Session DSL for running staged workflows with ACW.""" + +from __future__ import annotations + +import subprocess +import sys +import threading +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Callable, Iterable + +from agentize.workflow.api.acw import ACW, run_acw + +PromptWriter = Callable[[Path], str] +PromptInput = str | PromptWriter + + +@dataclass(frozen=True) +class StageResult: + """Result for a single pipeline stage.""" + + stage: str + input_path: Path + output_path: Path + process: subprocess.CompletedProcess + + def text(self) -> str: + return self.output_path.read_text() + + +@dataclass(frozen=True) +class StageCall: + """Call specification for a stage executed in run_parallel.""" + + stage: str + prompt: PromptInput + backend: tuple[str, str] + options: dict[str, Any] + + +class PipelineError(RuntimeError): + """Raised when a stage exhausts its retry budget.""" + + def __init__(self, stage: str, attempts: int, last_error: Exception | str) -> None: + self.stage = stage + self.attempts = attempts + self.last_error = last_error + detail = last_error if isinstance(last_error, str) else str(last_error) + super().__init__(f"Stage '{stage}' failed after {attempts} attempts: {detail}") + + +class Session: + """Imperative workflow session with shared artifact settings.""" + + def __init__( + self, + output_dir: str | Path, + prefix: str, + *, + runner: Callable[..., subprocess.CompletedProcess] = run_acw, + input_suffix: str = "-input.md", + output_suffix: str = "-output.md", + ) -> None: + self._output_dir = Path(output_dir) + self._output_dir.mkdir(parents=True, exist_ok=True) + self._prefix = prefix + self._runner = runner + self._input_suffix = input_suffix + self._output_suffix = output_suffix + self._log_lock = threading.Lock() + + def _log(self, message: str) -> None: + with self._log_lock: + print(message, file=sys.stderr) + + def _normalize_path(self, path: str | Path) -> Path: + path = Path(path) + if path.is_absolute(): + return path + return self._output_dir / path + + def _resolve_paths( + self, + stage: str, + input_path: str | Path | None, + output_path: str | Path | None, + ) -> tuple[Path, Path]: + if input_path is None: + resolved_input = self._output_dir / f"{self._prefix}-{stage}{self._input_suffix}" + else: + resolved_input = self._normalize_path(input_path) + + if output_path is None: + resolved_output = self._output_dir / f"{self._prefix}-{stage}{self._output_suffix}" + else: + resolved_output = self._normalize_path(output_path) + + return resolved_input, resolved_output + + def _write_prompt(self, prompt: PromptInput, input_path: Path) -> None: + if callable(prompt): + rendered = prompt(input_path) + if not input_path.exists(): + if rendered is None: + raise ValueError( + "Prompt writer did not write input file or return content" + ) + input_path.write_text(rendered) + return + if not isinstance(prompt, str): + raise TypeError("prompt must be a string or a callable writer") + input_path.write_text(prompt) + + def _run_stage( + self, + name: str, + backend: tuple[str, str], + input_path: Path, + output_path: Path, + *, + tools: str | None, + permission_mode: str | None, + timeout: int, + extra_flags: list[str] | None, + ) -> subprocess.CompletedProcess: + provider, model = backend + acw_runner = ACW( + name=name, + provider=provider, + model=model, + timeout=timeout, + tools=tools, + permission_mode=permission_mode, + extra_flags=extra_flags, + log_writer=self._log, + runner=self._runner, + ) + return acw_runner.run(input_path, output_path) + + def _validate_output(self, stage: str, output_path: Path, process: subprocess.CompletedProcess) -> None: + if process.returncode != 0: + raise RuntimeError( + f"Stage '{stage}' failed with exit code {process.returncode}" + ) + if not output_path.exists() or output_path.stat().st_size == 0: + raise RuntimeError(f"Stage '{stage}' produced no output") + + def run_prompt( + self, + name: str, + prompt: PromptInput, + backend: tuple[str, str], + *, + tools: str | None = None, + permission_mode: str | None = None, + timeout: int = 3600, + extra_flags: list[str] | None = None, + retry: int = 0, + retry_delay: float = 0, + input_path: str | Path | None = None, + output_path: str | Path | None = None, + ) -> StageResult: + input_path_resolved, output_path_resolved = self._resolve_paths( + name, input_path, output_path + ) + + attempts = 0 + last_error: Exception | str = "" + + for attempt in range(1, retry + 2): + attempts = attempt + try: + self._write_prompt(prompt, input_path_resolved) + process = self._run_stage( + name, + backend, + input_path_resolved, + output_path_resolved, + tools=tools, + permission_mode=permission_mode, + timeout=timeout, + extra_flags=extra_flags, + ) + self._validate_output(name, output_path_resolved, process) + return StageResult( + stage=name, + input_path=input_path_resolved, + output_path=output_path_resolved, + process=process, + ) + except Exception as exc: + last_error = exc + if attempt <= retry and retry_delay > 0: + time.sleep(retry_delay) + + raise PipelineError(name, attempts, last_error) + + def stage( + self, + name: str, + prompt: PromptInput, + backend: tuple[str, str], + **opts: Any, + ) -> StageCall: + if "retry" in opts or "retry_delay" in opts: + raise ValueError("retry and retry_delay are configured on run_parallel") + return StageCall(stage=name, prompt=prompt, backend=backend, options=opts) + + def run_parallel( + self, + calls: Iterable[StageCall], + *, + max_workers: int = 2, + retry: int = 0, + retry_delay: float = 0, + ) -> dict[str, StageResult]: + results: dict[str, StageResult] = {} + futures = {} + stage_names: set[str] = set() + + from concurrent.futures import ThreadPoolExecutor + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + for call in calls: + if call.stage in stage_names: + raise ValueError(f"Duplicate stage name '{call.stage}'") + stage_names.add(call.stage) + futures[executor.submit( + self.run_prompt, + call.stage, + call.prompt, + call.backend, + retry=retry, + retry_delay=retry_delay, + **call.options, + )] = call.stage + + for future, stage in list(futures.items()): + result = future.result() + results[stage] = result + + return results + + +__all__ = ["Session", "StageCall", "StageResult", "PipelineError"] diff --git a/python/agentize/workflow/impl/impl.md b/python/agentize/workflow/impl/impl.md index df62ea0..5aed864 100644 --- a/python/agentize/workflow/impl/impl.md +++ b/python/agentize/workflow/impl/impl.md @@ -29,10 +29,11 @@ source of truth. **Behavior**: - Resolves the issue worktree via `wt pathto`, spawning with `wt spawn --no-agent` if needed. - Syncs the issue branch by fetching and rebasing onto the detected default branch before iterations. -- Prefetches issue content via `agentize.workflow.utils.gh` into `.tmp/issue-.md`; fails if empty. +- Prefetches issue content via `agentize.workflow.api.gh` into `.tmp/issue-.md`; fails if empty. - Renders iteration prompts from `continue-prompt.md` into `.tmp/impl-input-.txt` - via `agentize.workflow.utils.prompt.render`. -- Runs the shared `ACW` runner (provider validation + timing logs) and captures output in `.tmp/impl-output.txt`. + via `agentize.workflow.api.prompt.render`. +- Runs iterations through `Session.run_prompt()` with input/output path overrides to reuse + `.tmp/impl-output.txt` across iterations. - Requires `.tmp/commit-report-iter-.txt` for commits; stages and commits when diffs exist. - Detects completion via `.tmp/finalize.txt` containing `Issue resolved`. - Pushes the branch and opens a PR using the completion file as title/body. @@ -66,7 +67,7 @@ placeholder tokens and splices optional sections. ## Internal Helpers ### rel_path() -Resolves template files relative to `impl.py` for portability using `utils.path.relpath`. +Resolves template files relative to `impl.py` for portability using `api.path.relpath`. ### render_prompt() Builds the iteration prompt by filling placeholders and conditionally inserting diff --git a/python/agentize/workflow/impl/impl.py b/python/agentize/workflow/impl/impl.py index f9afcfe..cb19be1 100644 --- a/python/agentize/workflow/impl/impl.py +++ b/python/agentize/workflow/impl/impl.py @@ -9,10 +9,11 @@ from typing import Iterable from agentize.shell import run_shell_function -from agentize.workflow.utils import ACW -from agentize.workflow.utils import gh as gh_utils -from agentize.workflow.utils import path as path_utils -from agentize.workflow.utils import prompt as prompt_utils +from agentize.workflow.api import Session +from agentize.workflow.api import gh as gh_utils +from agentize.workflow.api import path as path_utils +from agentize.workflow.api import prompt as prompt_utils +from agentize.workflow.api.session import PipelineError class ImplError(RuntimeError): @@ -347,6 +348,8 @@ def run_impl_workflow( output_file = tmp_dir / "impl-output.txt" finalize_file = tmp_dir / "finalize.txt" + session = Session(output_dir=tmp_dir, prefix=f"impl-{issue_no}") + _prefetch_issue(issue_no, issue_file, cwd=worktree) template_path = rel_path("continue-prompt.md") @@ -373,7 +376,7 @@ def run_impl_workflow( tmp_dir / f"commit-report-iter-{iteration - 1}.txt" ) - render_prompt( + prompt_text = render_prompt( template_path, issue_no=issue_no, issue_file=issue_file, @@ -385,21 +388,18 @@ def run_impl_workflow( ) extra_flags = ["--yolo"] if yolo else None - - def _log_writer(message: str) -> None: - print(message, file=sys.stderr) - - acw_runner = ACW( - name=f"impl-iter-{iteration}", - provider=provider, - model=model, - extra_flags=extra_flags, - log_writer=_log_writer, - ) - acw_result = acw_runner.run(input_file, output_file) - if acw_result.returncode != 0: + try: + session.run_prompt( + f"impl-iter-{iteration}", + prompt_text, + (provider, model), + extra_flags=extra_flags, + input_path=input_file, + output_path=output_file, + ) + except PipelineError as exc: print( - f"Warning: acw exited with non-zero status on iteration {iteration}", + f"Warning: acw failed on iteration {iteration} ({exc})", file=sys.stderr, ) diff --git a/python/agentize/workflow/planner.md b/python/agentize/workflow/planner.md index b9be5b9..d8e6de9 100644 --- a/python/agentize/workflow/planner.md +++ b/python/agentize/workflow/planner.md @@ -51,7 +51,7 @@ def run_acw( ) -> subprocess.CompletedProcess ``` -Re-export of the ACW shell invocation helper. +Re-export of the ACW shell invocation helper from `agentize.workflow.api`. ## Internal Helpers diff --git a/python/agentize/workflow/planner.py b/python/agentize/workflow/planner.py index 0bc014a..0dc2d18 100644 --- a/python/agentize/workflow/planner.py +++ b/python/agentize/workflow/planner.py @@ -7,7 +7,7 @@ """ # Re-export everything from the new locations for backward compatibility -from agentize.workflow.utils import run_acw +from agentize.workflow.api import run_acw from agentize.workflow.planner import run_planner_pipeline, StageResult __all__ = ["run_acw", "run_planner_pipeline", "StageResult"] diff --git a/python/agentize/workflow/planner/README.md b/python/agentize/workflow/planner/README.md index 6c557e7..4b028ec 100644 --- a/python/agentize/workflow/planner/README.md +++ b/python/agentize/workflow/planner/README.md @@ -4,7 +4,7 @@ Runnable package for the multi-stage planner pipeline. Provides both library int ## Purpose -This package contains the 5-stage planner pipeline (understander → bold → critique → reducer → consensus) that powers `lol plan`. It is structured as a runnable package to support `python -m agentize.workflow.planner` invocation. +This package contains the 5-stage planner pipeline (understander → bold → critique → reducer → consensus) that powers `lol plan`. It is structured as a runnable package to support `python -m agentize.workflow.planner` invocation, with the pipeline implemented as a Session DSL example. ## Invocation @@ -40,7 +40,8 @@ for stage, result in results.items(): | File | Purpose | |------|---------| | `__init__.py` | Package exports: `run_planner_pipeline`, `StageResult` | -| `__main__.py` | Pipeline logic, CLI backend, and entry point | +| `pipeline.py` | Planner pipeline implemented with the Session DSL | +| `__main__.py` | CLI backend and entry point | | `README.md` | This documentation | ## Exports @@ -50,11 +51,11 @@ for stage, result in results.items(): ## Dependencies -- `agentize.workflow.utils`: ACW runner, prompt rendering, and GitHub helpers +- `agentize.workflow.api`: Session DSL, ACW runner, prompt rendering, and GitHub helpers - `agentize.shell`: `get_agentize_home()` for path resolution - Prompt templates in `.claude-plugin/agents/` and `.claude-plugin/skills/` ## Design Rationale - **Runnable package**: Using `__main__.py` enables `python -m` invocation while keeping logic in a single file. -- **Separation**: Helper utilities live in `workflow/utils/`; pipeline orchestration lives here. +- **Separation**: Workflow helpers live in `workflow/api/`; pipeline orchestration lives here. diff --git a/python/agentize/workflow/planner/__init__.md b/python/agentize/workflow/planner/__init__.md index 1ddbe02..f9211ee 100644 --- a/python/agentize/workflow/planner/__init__.md +++ b/python/agentize/workflow/planner/__init__.md @@ -20,7 +20,7 @@ def run_planner_pipeline( ) -> dict[str, StageResult] ``` -Re-export of the planner pipeline execution entry point. +Re-export of the planner pipeline execution entry point from `pipeline.py`. ### `StageResult` @@ -37,5 +37,5 @@ Re-export of the per-stage result dataclass. ## Internal Helpers -This module re-exports interfaces from `planner.__main__` and does not define internal +This module re-exports interfaces from `planner.pipeline` and does not define internal helpers. diff --git a/python/agentize/workflow/planner/__init__.py b/python/agentize/workflow/planner/__init__.py index bb5328b..4ec044d 100644 --- a/python/agentize/workflow/planner/__init__.py +++ b/python/agentize/workflow/planner/__init__.py @@ -8,7 +8,7 @@ - StageResult: Dataclass for per-stage results """ -from agentize.workflow.planner.__main__ import ( +from agentize.workflow.planner.pipeline import ( run_planner_pipeline, StageResult, ) diff --git a/python/agentize/workflow/planner/__main__.md b/python/agentize/workflow/planner/__main__.md index 9be95ec..6b465a5 100644 --- a/python/agentize/workflow/planner/__main__.md +++ b/python/agentize/workflow/planner/__main__.md @@ -1,45 +1,9 @@ # __main__.py -Planner pipeline orchestration and CLI backend for `python -m agentize.workflow.planner`. +CLI backend for `python -m agentize.workflow.planner`, delegating pipeline execution to `pipeline.py`. ## External Interfaces -### `StageResult` - -```python -@dataclass -class StageResult: - stage: str - input_path: Path - output_path: Path - process: subprocess.CompletedProcess -``` - -Represents a single stage execution result, including the input/output artifact paths and -subprocess result. - -### `run_planner_pipeline()` - -```python -def run_planner_pipeline( - feature_desc: str, - *, - output_dir: str | Path = ".tmp", - backends: dict[str, tuple[str, str]] | None = None, - parallel: bool = True, - runner: Callable[..., subprocess.CompletedProcess] = run_acw, - prefix: str | None = None, - output_suffix: str = "-output.md", - skip_consensus: bool = False, -) -> dict[str, StageResult] -``` - -Executes the 5-stage planner pipeline. Stages run through the `ACW` class (provider -validation + start/finish timing logs). Prompt templates are rendered via -`agentize.workflow.utils.prompt` to support both `{{TOKEN}}` and `{#TOKEN#}` placeholders. -The pipeline prints plain stage-start lines to stderr and returns `StageResult` objects -for each stage. - ### `main()` ```python @@ -53,23 +17,16 @@ the footer before reuse as debate context. Returns process exit code. ## Internal Helpers -### Prompt rendering - -- `_render_stage_prompt()`: Builds each stage prompt from agent template, plan-guideline - content, feature description, and previous outputs using `prompt.read_prompt()`. -- `_render_consensus_prompt()`: Builds the consensus prompt by embedding bold/critique/ - reducer outputs into the external-consensus template using `prompt.render()`. - ### Stage execution -- `_run_consensus_stage()`: Runs the consensus stage and returns a `StageResult`. - Uses `ACW` when the default `run_acw` runner is in use, accepting an optional - `log_writer` for serialized log output. +The CLI delegates stage execution to `pipeline.run_planner_pipeline()` and +`pipeline.run_consensus_stage()`, which use the Session DSL and the workflow API +helpers for prompt rendering and ACW invocation. ### Issue/publish helpers - `_issue_create()`, `_issue_fetch()`, `_issue_publish()`: GitHub issue lifecycle for - plan publishing backed by `agentize.workflow.utils.gh`. + plan publishing backed by `agentize.workflow.api.gh`. - `_extract_plan_title()`, `_apply_issue_tag()`: Plan title parsing and issue tagging. - `_resolve_commit_hash()`: Resolves the current repo `HEAD` commit for provenance. - `_append_plan_footer()`: Appends `Plan based on commit ` to consensus output. @@ -82,9 +39,9 @@ the footer before reuse as debate context. Returns process exit code. ## Design Rationale -- **Unified runner path**: The pipeline uses the `ACW` class for stage execution so - timing logs and provider validation remain consistent. +- **Pipeline separation**: CLI orchestration lives here while the Session-based pipeline + stays in `pipeline.py`. - **Plain progress output**: The CLI prints concise stage lines without TTY-specific rendering to keep logs readable in terminals and CI. -- **Isolation**: Prompt rendering and issue/publish logic are kept in helpers to reduce - coupling between orchestration and IO concerns. +- **Isolation**: Issue/publish logic is kept in helpers to reduce coupling between CLI + glue and workflow execution. diff --git a/python/agentize/workflow/planner/__main__.py b/python/agentize/workflow/planner/__main__.py index 74587a1..92f7ae5 100644 --- a/python/agentize/workflow/planner/__main__.py +++ b/python/agentize/workflow/planner/__main__.py @@ -12,349 +12,13 @@ import re import subprocess import sys -import threading -from concurrent.futures import ThreadPoolExecutor -from dataclasses import dataclass from datetime import datetime from pathlib import Path -from typing import Callable, Optional +from typing import Optional -from agentize.shell import get_agentize_home -from agentize.workflow.utils import ACW, run_acw -from agentize.workflow.utils import gh as gh_utils -from agentize.workflow.utils import prompt as prompt_utils - - -@dataclass -class StageResult: - """Result for a single pipeline stage.""" - - stage: str - input_path: Path - output_path: Path - process: subprocess.CompletedProcess - - -# ============================================================ -# Stage Configuration -# ============================================================ - -# Stage names in execution order -STAGES = ["understander", "bold", "critique", "reducer", "consensus"] - -# Agent prompt paths (relative to AGENTIZE_HOME) -AGENT_PROMPTS = { - "understander": ".claude-plugin/agents/understander.md", - "bold": ".claude-plugin/agents/bold-proposer.md", - "critique": ".claude-plugin/agents/proposal-critique.md", - "reducer": ".claude-plugin/agents/proposal-reducer.md", -} - -# Stages that include plan-guideline content -STAGES_WITH_PLAN_GUIDELINE = {"bold", "critique", "reducer"} - -# Default backends per stage (provider, model) -DEFAULT_BACKENDS = { - "understander": ("claude", "sonnet"), - "bold": ("claude", "opus"), - "critique": ("claude", "opus"), - "reducer": ("claude", "opus"), - "consensus": ("claude", "opus"), -} - -# Tool configurations per stage (Claude provider only) -STAGE_TOOLS = { - "understander": "Read,Grep,Glob", - "bold": "Read,Grep,Glob,WebSearch,WebFetch", - "critique": "Read,Grep,Glob,Bash", - "reducer": "Read,Grep,Glob", - "consensus": "Read,Grep,Glob", -} - -# Permission mode per stage (Claude provider only) -STAGE_PERMISSION_MODE = { - "bold": "plan", -} - - -# ============================================================ -# Prompt Rendering -# ============================================================ - - -def _render_stage_prompt( - stage: str, - feature_desc: str, - agentize_home: Path, - previous_output: str | None = None, -) -> str: - """Render the input prompt for a stage. - - Args: - stage: Stage name - feature_desc: Feature request description - agentize_home: Path to agentize repository root - previous_output: Output from previous stage (if any) - - Returns: - Rendered prompt content - """ - parts = [] - - # Add agent base prompt (if not consensus) - if stage in AGENT_PROMPTS: - agent_path = agentize_home / AGENT_PROMPTS[stage] - parts.append(prompt_utils.read_prompt(agent_path, strip_frontmatter=True)) - - # Add plan-guideline for applicable stages - if stage in STAGES_WITH_PLAN_GUIDELINE: - plan_guideline_path = ( - agentize_home / ".claude-plugin/skills/plan-guideline/SKILL.md" - ) - if plan_guideline_path.exists(): - parts.append("\n---\n") - parts.append("# Planning Guidelines\n") - parts.append(prompt_utils.read_prompt(plan_guideline_path, strip_frontmatter=True)) - - # Add feature description - parts.append("\n---\n") - parts.append("# Feature Request\n") - parts.append(feature_desc) - - # Add previous stage output if provided - if previous_output: - parts.append("\n---\n") - parts.append("# Previous Stage Output\n") - parts.append(previous_output) - - return "\n".join(parts) - - -def _build_combined_report( - bold_output: str, - critique_output: str, - reducer_output: str, -) -> str: - """Build the combined report for the consensus template.""" - return f"""## Bold Proposer Output - -{bold_output} - -## Critique Output - -{critique_output} - -## Reducer Output - -{reducer_output} -""" - - -def _render_consensus_prompt( - feature_desc: str, - combined_report: str, - agentize_home: Path, - dest_path: Path, -) -> str: - """Render the consensus prompt with combined report and write to dest_path.""" - template_path = ( - agentize_home - / ".claude-plugin/skills/external-consensus/external-review-prompt.md" - ) - return prompt_utils.render( - template_path, - {"FEATURE_DESCRIPTION": feature_desc, "COMBINED_REPORT": combined_report}, - dest_path, - strip_frontmatter=True, - ) - - -# ============================================================ -# Pipeline Orchestration -# ============================================================ - - -def run_planner_pipeline( - feature_desc: str, - *, - output_dir: str | Path = ".tmp", - backends: dict[str, tuple[str, str]] | None = None, - parallel: bool = True, - runner: Callable[..., subprocess.CompletedProcess] = run_acw, - prefix: str | None = None, - output_suffix: str = "-output.md", - skip_consensus: bool = False, -) -> dict[str, StageResult]: - """Execute the 5-stage planner pipeline. - - Args: - feature_desc: Feature request description to plan - output_dir: Directory for artifacts (default: .tmp) - backends: Provider/model mapping per stage (default: understander uses claude/sonnet, others claude/opus) - parallel: Run critique and reducer in parallel (default: True) - runner: Callable for stage execution (injectable for testing) - prefix: Artifact filename prefix (default: timestamp-based) - output_suffix: Suffix appended to stage output filenames - skip_consensus: Skip the consensus stage (default: False) - Returns: - Dict mapping stage names to StageResult objects - - Raises: - FileNotFoundError: If required prompt templates are missing - RuntimeError: If a stage execution fails - """ - agentize_home = Path(get_agentize_home()) - output_path = Path(output_dir) - output_path.mkdir(parents=True, exist_ok=True) - - # Determine artifact prefix - if prefix is None: - prefix = datetime.now().strftime("%Y%m%d-%H%M%S") - - # Merge backends with defaults - stage_backends = {**DEFAULT_BACKENDS} - if backends: - stage_backends.update(backends) - - results: dict[str, StageResult] = {} - log_lock = threading.Lock() - - def _log_writer(message: str) -> None: - with log_lock: - print(message, file=sys.stderr) - - def _log_stage(message: str) -> None: - with log_lock: - print(message, file=sys.stderr) - - def _stage_label(stage: str) -> str: - if stage == "bold": - return "bold-proposer" - return stage - - def _backend_label(stage: str) -> str: - provider, model = stage_backends[stage] - return f"{provider}:{model}" - - def _run_stage( - stage: str, - input_content: str | None = None, - previous_output: str | None = None, - *, - input_writer: Callable[[Path], str] | None = None, - ) -> StageResult: - """Run a single stage and return result.""" - input_path = output_path / f"{prefix}-{stage}-input.md" - output_file = output_path / f"{prefix}-{stage}{output_suffix}" - - # Write input prompt - if input_writer is not None: - input_writer(input_path) - else: - if input_content is None: - raise ValueError(f"Missing input content for stage '{stage}'") - input_path.write_text(input_content) - - # Get backend configuration - provider, model = stage_backends[stage] - - # Run stage via ACW (unified path for both default and custom runners) - acw_runner = ACW( - name=_stage_label(stage), - provider=provider, - model=model, - tools=STAGE_TOOLS.get(stage), - permission_mode=STAGE_PERMISSION_MODE.get(stage), - log_writer=_log_writer, - runner=runner, - ) - process = acw_runner.run(input_path, output_file) - - return StageResult( - stage=stage, - input_path=input_path, - output_path=output_file, - process=process, - ) - - def _check_stage_result(result: StageResult) -> None: - """Check if stage succeeded, raise RuntimeError if not.""" - if result.process.returncode != 0: - raise RuntimeError( - f"Stage '{result.stage}' failed with exit code {result.process.returncode}" - ) - if not result.output_path.exists() or result.output_path.stat().st_size == 0: - raise RuntimeError(f"Stage '{result.stage}' produced no output") - - # ── Stage 1: Understander ── - understander_prompt = _render_stage_prompt( - "understander", feature_desc, agentize_home - ) - _log_stage(f"Stage 1/5: Running understander ({_backend_label('understander')})") - results["understander"] = _run_stage("understander", understander_prompt) - _check_stage_result(results["understander"]) - understander_output = results["understander"].output_path.read_text() - - # ── Stage 2: Bold ── - bold_prompt = _render_stage_prompt( - "bold", feature_desc, agentize_home, understander_output - ) - _log_stage(f"Stage 2/5: Running bold-proposer ({_backend_label('bold')})") - results["bold"] = _run_stage("bold", bold_prompt) - _check_stage_result(results["bold"]) - bold_output = results["bold"].output_path.read_text() - - # ── Stage 3 & 4: Critique and Reducer ── - critique_prompt = _render_stage_prompt( - "critique", feature_desc, agentize_home, bold_output - ) - reducer_prompt = _render_stage_prompt( - "reducer", feature_desc, agentize_home, bold_output - ) - - mode_label = "parallel" if parallel else "sequentially" - _log_stage( - f"Stage 3-4/5: Running critique and reducer {mode_label} " - f"({_backend_label('critique')}, {_backend_label('reducer')})" - ) - if parallel: - # Run in parallel using ThreadPoolExecutor - with ThreadPoolExecutor(max_workers=2) as executor: - critique_future = executor.submit(_run_stage, "critique", critique_prompt) - reducer_future = executor.submit(_run_stage, "reducer", reducer_prompt) - - results["critique"] = critique_future.result() - results["reducer"] = reducer_future.result() - else: - # Run sequentially - results["critique"] = _run_stage("critique", critique_prompt) - results["reducer"] = _run_stage("reducer", reducer_prompt) - _check_stage_result(results["critique"]) - _check_stage_result(results["reducer"]) - critique_output = results["critique"].output_path.read_text() - reducer_output = results["reducer"].output_path.read_text() - - if skip_consensus: - return results - - combined_report = _build_combined_report( - bold_output, critique_output, reducer_output - ) - - def _write_consensus_prompt(path: Path) -> str: - return _render_consensus_prompt( - feature_desc, - combined_report, - agentize_home, - path, - ) - - # ── Stage 5: Consensus ── - _log_stage(f"Stage 5/5: Running consensus ({_backend_label('consensus')})") - results["consensus"] = _run_stage("consensus", input_writer=_write_consensus_prompt) - _check_stage_result(results["consensus"]) - - return results +from agentize.workflow.api import run_acw +from agentize.workflow.api import gh as gh_utils +from agentize.workflow.planner.pipeline import run_consensus_stage, run_planner_pipeline # ============================================================ @@ -649,59 +313,6 @@ def _apply_issue_tag(plan_title: str, issue_number: str) -> str: return issue_tag -def _run_consensus_stage( - feature_desc: str, - bold_path: Path, - critique_path: Path, - reducer_path: Path, - output_dir: Path, - prefix: str, - stage_backends: dict[str, tuple[str, str]], - *, - runner: Callable[..., subprocess.CompletedProcess] = run_acw, - log_writer: Callable[[str], None] | None = None, -) -> StageResult: - bold_output = bold_path.read_text() - critique_output = critique_path.read_text() - reducer_output = reducer_path.read_text() - agentize_home = Path(get_agentize_home()) - - combined_report = _build_combined_report( - bold_output, - critique_output, - reducer_output, - ) - - input_path = output_dir / f"{prefix}-consensus-input.md" - output_path = output_dir / f"{prefix}-consensus.md" - _render_consensus_prompt( - feature_desc, - combined_report, - agentize_home, - input_path, - ) - - provider, model = stage_backends["consensus"] - # Unified path: always use ACW, passing custom runner if provided - acw_runner = ACW( - name="consensus", - provider=provider, - model=model, - tools=STAGE_TOOLS.get("consensus"), - permission_mode=STAGE_PERMISSION_MODE.get("consensus"), - log_writer=log_writer, - runner=runner, - ) - process = acw_runner.run(input_path, output_path) - - return StageResult( - stage="consensus", - input_path=input_path, - output_path=output_path, - process=process, - ) - - def main(argv: list[str]) -> int: """CLI entrypoint for planner pipeline orchestration.""" parser = argparse.ArgumentParser(description="Planner pipeline backend") @@ -797,22 +408,17 @@ def _log_verbose(message: str) -> None: consensus_backend = stage_backends["consensus"] _log(f"Stage 5/5: Running consensus ({consensus_backend[0]}:{consensus_backend[1]})") - log_lock = threading.Lock() - def _log_writer(message: str) -> None: - with log_lock: - print(message, file=sys.stderr) try: - consensus_result = _run_consensus_stage( + consensus_result = run_consensus_stage( feature_desc, - results["bold"].output_path, - results["critique"].output_path, - results["reducer"].output_path, - output_dir, - prefix_name, - stage_backends, + bold_path=results["bold"].output_path, + critique_path=results["critique"].output_path, + reducer_path=results["reducer"].output_path, + output_dir=output_dir, + prefix=prefix_name, + stage_backends=stage_backends, runner=run_acw, - log_writer=_log_writer, ) except (FileNotFoundError, RuntimeError, subprocess.TimeoutExpired) as exc: print(f"Error: {exc}", file=sys.stderr) diff --git a/python/agentize/workflow/planner/pipeline.md b/python/agentize/workflow/planner/pipeline.md new file mode 100644 index 0000000..a4e2ef0 --- /dev/null +++ b/python/agentize/workflow/planner/pipeline.md @@ -0,0 +1,71 @@ +# pipeline.py + +Planner pipeline implementation built on the Session DSL. Provides the canonical example of the imperative workflow API. + +## External Interfaces + +### `run_planner_pipeline()` + +```python +def run_planner_pipeline( + feature_desc: str, + *, + output_dir: str | Path = ".tmp", + backends: dict[str, tuple[str, str]] | None = None, + parallel: bool = True, + runner: Callable[..., subprocess.CompletedProcess] = run_acw, + prefix: str | None = None, + output_suffix: str = "-output.md", + skip_consensus: bool = False, +) -> dict[str, StageResult] +``` + +Runs the 5-stage planner pipeline (understander → bold → critique → reducer → consensus). +Returns a mapping of stage names to `StageResult` objects. When `skip_consensus` is set, +only the first four stages are executed. + +### `run_consensus_stage()` + +```python +def run_consensus_stage( + feature_desc: str, + *, + bold_path: Path, + critique_path: Path, + reducer_path: Path, + output_dir: Path, + prefix: str, + stage_backends: dict[str, tuple[str, str]], + runner: Callable[..., subprocess.CompletedProcess] = run_acw, +) -> StageResult +``` + +Runs the consensus stage independently, writing the consensus prompt and output +artifacts (`*-consensus-input.md`, `*-consensus.md`). + +### `StageResult` + +`StageResult` is re-exported from the Session DSL and represents a single stage result. + +## Internal Helpers + +### Prompt rendering + +- `_render_stage_prompt()`: Builds prompts by concatenating agent templates, plan-guideline + content, the feature description, and prior outputs. +- `_render_consensus_prompt()`: Renders the external-consensus template with the + combined reports from bold/critique/reducer. + +### Stage configuration + +- `DEFAULT_BACKENDS`, `STAGE_TOOLS`, `STAGE_PERMISSION_MODE`: Default per-stage settings. +- `AGENT_PROMPTS` and `STAGES_WITH_PLAN_GUIDELINE`: Prompt composition inputs. + +## Design Rationale + +- **Session DSL as baseline**: The planner pipeline demonstrates the imperative Session API + with both sequential and parallel stages. +- **Explicit artifacts**: Stage-specific input/output files remain predictable and + match CLI documentation. +- **Reusable consensus stage**: Running consensus separately preserves the `.txt` + artifacts for debate stages while keeping the final plan in markdown. diff --git a/python/agentize/workflow/planner/pipeline.py b/python/agentize/workflow/planner/pipeline.py new file mode 100644 index 0000000..cb5b3a0 --- /dev/null +++ b/python/agentize/workflow/planner/pipeline.py @@ -0,0 +1,331 @@ +"""Planner pipeline orchestration using the Session DSL.""" + +from __future__ import annotations + +import subprocess +from datetime import datetime +from pathlib import Path +from typing import Callable + +from agentize.shell import get_agentize_home +from agentize.workflow.api import run_acw +from agentize.workflow.api import prompt as prompt_utils +from agentize.workflow.api.session import Session, StageResult + + +# ============================================================ +# Stage Configuration +# ============================================================ + +# Stage names in execution order +STAGES = ["understander", "bold", "critique", "reducer", "consensus"] + +# Agent prompt paths (relative to AGENTIZE_HOME) +AGENT_PROMPTS = { + "understander": ".claude-plugin/agents/understander.md", + "bold": ".claude-plugin/agents/bold-proposer.md", + "critique": ".claude-plugin/agents/proposal-critique.md", + "reducer": ".claude-plugin/agents/proposal-reducer.md", +} + +# Stages that include plan-guideline content +STAGES_WITH_PLAN_GUIDELINE = {"bold", "critique", "reducer"} + +# Default backends per stage (provider, model) +DEFAULT_BACKENDS = { + "understander": ("claude", "sonnet"), + "bold": ("claude", "opus"), + "critique": ("claude", "opus"), + "reducer": ("claude", "opus"), + "consensus": ("claude", "opus"), +} + +# Tool configurations per stage (Claude provider only) +STAGE_TOOLS = { + "understander": "Read,Grep,Glob", + "bold": "Read,Grep,Glob,WebSearch,WebFetch", + "critique": "Read,Grep,Glob,Bash", + "reducer": "Read,Grep,Glob", + "consensus": "Read,Grep,Glob", +} + +# Permission mode per stage (Claude provider only) +STAGE_PERMISSION_MODE = { + "bold": "plan", +} + + +# ============================================================ +# Prompt Rendering +# ============================================================ + + +def _render_stage_prompt( + stage: str, + feature_desc: str, + agentize_home: Path, + previous_output: str | None = None, +) -> str: + """Render the input prompt for a stage.""" + parts = [] + + if stage in AGENT_PROMPTS: + agent_path = agentize_home / AGENT_PROMPTS[stage] + parts.append(prompt_utils.read_prompt(agent_path, strip_frontmatter=True)) + + if stage in STAGES_WITH_PLAN_GUIDELINE: + plan_guideline_path = ( + agentize_home / ".claude-plugin/skills/plan-guideline/SKILL.md" + ) + if plan_guideline_path.exists(): + parts.append("\n---\n") + parts.append("# Planning Guidelines\n") + parts.append(prompt_utils.read_prompt(plan_guideline_path, strip_frontmatter=True)) + + parts.append("\n---\n") + parts.append("# Feature Request\n") + parts.append(feature_desc) + + if previous_output: + parts.append("\n---\n") + parts.append("# Previous Stage Output\n") + parts.append(previous_output) + + return "\n".join(parts) + + +def _build_combined_report( + bold_output: str, + critique_output: str, + reducer_output: str, +) -> str: + """Build the combined report for the consensus template.""" + return f"""## Bold Proposer Output + +{bold_output} + +## Critique Output + +{critique_output} + +## Reducer Output + +{reducer_output} +""" + + +def _render_consensus_prompt( + feature_desc: str, + combined_report: str, + agentize_home: Path, + dest_path: Path, +) -> str: + """Render the consensus prompt with combined report and write to dest_path.""" + template_path = ( + agentize_home + / ".claude-plugin/skills/external-consensus/external-review-prompt.md" + ) + return prompt_utils.render( + template_path, + {"FEATURE_DESCRIPTION": feature_desc, "COMBINED_REPORT": combined_report}, + dest_path, + strip_frontmatter=True, + ) + + +# ============================================================ +# Pipeline Orchestration +# ============================================================ + + +def run_planner_pipeline( + feature_desc: str, + *, + output_dir: str | Path = ".tmp", + backends: dict[str, tuple[str, str]] | None = None, + parallel: bool = True, + runner: Callable[..., subprocess.CompletedProcess] = run_acw, + prefix: str | None = None, + output_suffix: str = "-output.md", + skip_consensus: bool = False, +) -> dict[str, StageResult]: + """Execute the 5-stage planner pipeline.""" + agentize_home = Path(get_agentize_home()) + output_path = Path(output_dir) + output_path.mkdir(parents=True, exist_ok=True) + + if prefix is None: + prefix = datetime.now().strftime("%Y%m%d-%H%M%S") + + stage_backends = {**DEFAULT_BACKENDS} + if backends: + stage_backends.update(backends) + + session = Session( + output_dir=output_path, + prefix=prefix, + runner=runner, + output_suffix=output_suffix, + ) + + def _log_stage(message: str) -> None: + session._log(message) + + def _backend_label(stage: str) -> str: + provider, model = stage_backends[stage] + return f"{provider}:{model}" + + results: dict[str, StageResult] = {} + + understander_prompt = _render_stage_prompt( + "understander", feature_desc, agentize_home + ) + _log_stage(f"Stage 1/5: Running understander ({_backend_label('understander')})") + results["understander"] = session.run_prompt( + "understander", + understander_prompt, + stage_backends["understander"], + tools=STAGE_TOOLS.get("understander"), + permission_mode=STAGE_PERMISSION_MODE.get("understander"), + ) + understander_output = results["understander"].text() + + bold_prompt = _render_stage_prompt( + "bold", feature_desc, agentize_home, understander_output + ) + _log_stage(f"Stage 2/5: Running bold-proposer ({_backend_label('bold')})") + results["bold"] = session.run_prompt( + "bold", + bold_prompt, + stage_backends["bold"], + tools=STAGE_TOOLS.get("bold"), + permission_mode=STAGE_PERMISSION_MODE.get("bold"), + ) + bold_output = results["bold"].text() + + critique_prompt = _render_stage_prompt( + "critique", feature_desc, agentize_home, bold_output + ) + reducer_prompt = _render_stage_prompt( + "reducer", feature_desc, agentize_home, bold_output + ) + + mode_label = "parallel" if parallel else "sequentially" + _log_stage( + f"Stage 3-4/5: Running critique and reducer {mode_label} " + f"({_backend_label('critique')}, {_backend_label('reducer')})" + ) + + if parallel: + parallel_results = session.run_parallel( + [ + session.stage( + "critique", + critique_prompt, + stage_backends["critique"], + tools=STAGE_TOOLS.get("critique"), + permission_mode=STAGE_PERMISSION_MODE.get("critique"), + ), + session.stage( + "reducer", + reducer_prompt, + stage_backends["reducer"], + tools=STAGE_TOOLS.get("reducer"), + permission_mode=STAGE_PERMISSION_MODE.get("reducer"), + ), + ] + ) + results.update(parallel_results) + else: + results["critique"] = session.run_prompt( + "critique", + critique_prompt, + stage_backends["critique"], + tools=STAGE_TOOLS.get("critique"), + permission_mode=STAGE_PERMISSION_MODE.get("critique"), + ) + results["reducer"] = session.run_prompt( + "reducer", + reducer_prompt, + stage_backends["reducer"], + tools=STAGE_TOOLS.get("reducer"), + permission_mode=STAGE_PERMISSION_MODE.get("reducer"), + ) + + critique_output = results["critique"].text() + reducer_output = results["reducer"].text() + + if skip_consensus: + return results + + combined_report = _build_combined_report( + bold_output, critique_output, reducer_output + ) + + def _write_consensus_prompt(path: Path) -> str: + return _render_consensus_prompt( + feature_desc, + combined_report, + agentize_home, + path, + ) + + _log_stage(f"Stage 5/5: Running consensus ({_backend_label('consensus')})") + results["consensus"] = session.run_prompt( + "consensus", + _write_consensus_prompt, + stage_backends["consensus"], + tools=STAGE_TOOLS.get("consensus"), + permission_mode=STAGE_PERMISSION_MODE.get("consensus"), + ) + + return results + + +def run_consensus_stage( + feature_desc: str, + *, + bold_path: Path, + critique_path: Path, + reducer_path: Path, + output_dir: Path, + prefix: str, + stage_backends: dict[str, tuple[str, str]], + runner: Callable[..., subprocess.CompletedProcess] = run_acw, +) -> StageResult: + """Run the consensus stage independently.""" + bold_output = bold_path.read_text() + critique_output = critique_path.read_text() + reducer_output = reducer_path.read_text() + agentize_home = Path(get_agentize_home()) + + combined_report = _build_combined_report( + bold_output, + critique_output, + reducer_output, + ) + + input_path = output_dir / f"{prefix}-consensus-input.md" + output_path = output_dir / f"{prefix}-consensus.md" + + def _write_consensus_prompt(path: Path) -> str: + return _render_consensus_prompt( + feature_desc, + combined_report, + agentize_home, + path, + ) + + session = Session(output_dir=output_dir, prefix=prefix, runner=runner) + return session.run_prompt( + "consensus", + _write_consensus_prompt, + stage_backends["consensus"], + tools=STAGE_TOOLS.get("consensus"), + permission_mode=STAGE_PERMISSION_MODE.get("consensus"), + input_path=input_path, + output_path=output_path, + ) + + +__all__ = ["run_planner_pipeline", "run_consensus_stage", "StageResult"] diff --git a/python/agentize/workflow/utils/README.md b/python/agentize/workflow/utils/README.md deleted file mode 100644 index 451e242..0000000 --- a/python/agentize/workflow/utils/README.md +++ /dev/null @@ -1,13 +0,0 @@ -# Workflow Utils Package - -Shared helpers for workflow orchestration, covering ACW invocation, GitHub CLI actions, -prompt rendering, and path resolution. - -## Organization - -- `__init__.py` - Convenience re-exports for public helper APIs -- `acw.py` - ACW invocation helpers with timing logs and provider validation -- `gh.py` - GitHub CLI wrappers for issue/label/PR actions -- `prompt.py` - Prompt rendering for `{#TOKEN#}` and `{{TOKEN}}` placeholders -- `path.py` - Path resolution helper relative to a module file -- Companion `.md` files document interfaces and internal helpers diff --git a/python/agentize/workflow/utils/__init__.md b/python/agentize/workflow/utils/__init__.md deleted file mode 100644 index b604c48..0000000 --- a/python/agentize/workflow/utils/__init__.md +++ /dev/null @@ -1,68 +0,0 @@ -# Module: agentize.workflow.utils - -Compatibility exports for workflow utilities plus a stable import surface for helper -modules under `agentize.workflow.utils.*`. - -## External Interfaces - -### `run_acw` - -```python -def run_acw( - provider: str, - model: str, - input_file: str | Path, - output_file: str | Path, - *, - tools: str | None = None, - permission_mode: str | None = None, - extra_flags: list[str] | None = None, - timeout: int = 3600, - cwd: str | Path | None = None, - env: dict[str, str] | None = None, -) -> subprocess.CompletedProcess -``` - -Re-export of `agentize.workflow.utils.acw.run_acw`. - -### `list_acw_providers` - -```python -def list_acw_providers() -> list[str] -``` - -Re-export of `agentize.workflow.utils.acw.list_acw_providers`. - -### `ACW` - -```python -class ACW: - def __init__( - self, - name: str, - provider: str, - model: str, - timeout: int = 900, - *, - tools: str | None = None, - permission_mode: str | None = None, - extra_flags: list[str] | None = None, - log_writer: Callable[[str], None] | None = None, - runner: Callable[..., subprocess.CompletedProcess] | None = None, - ) -> None: ... - def run(self, input_file: str | Path, output_file: str | Path) -> subprocess.CompletedProcess: ... -``` - -Re-export of `agentize.workflow.utils.acw.ACW`. - -## Internal Helpers - -This module only re-exports selected helpers and does not define its own internal -implementation. - -## Design Rationale - -- **Stable imports**: A single import path keeps workflow code and tests stable while - the helper modules remain organized under a package. -- **Focused surface**: Re-exports stay limited to the ACW helpers to avoid accidental - coupling to internal convenience utilities. diff --git a/python/agentize/workflow/utils/__init__.py b/python/agentize/workflow/utils/__init__.py deleted file mode 100644 index 9e3871a..0000000 --- a/python/agentize/workflow/utils/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -"""Workflow helper package for ACW invocation and orchestration utilities.""" - -from __future__ import annotations - -from agentize.workflow.utils.acw import ACW, list_acw_providers, run_acw - -__all__ = ["ACW", "list_acw_providers", "run_acw"] diff --git a/python/tests/test_planner_workflow.py b/python/tests/test_planner_workflow.py index 95fd12a..3ea053d 100644 --- a/python/tests/test_planner_workflow.py +++ b/python/tests/test_planner_workflow.py @@ -26,9 +26,9 @@ # Additional import path tests (these will be exercised in dedicated tests below) try: - from agentize.workflow.utils import run_acw as utils_run_acw + from agentize.workflow.api import run_acw as api_run_acw except ImportError: - utils_run_acw = None + api_run_acw = None try: from agentize.workflow.planner import run_planner_pipeline as planner_run_pipeline, StageResult as planner_StageResult @@ -356,17 +356,17 @@ class TestACWRunner: @pytest.mark.skipif(ACW is None, reason="Implementation not yet available") def test_invalid_provider_raises(self, monkeypatch): """ACW raises ValueError when provider is not in completion list.""" - from agentize.workflow.utils import ACW as utils_ACW + from agentize.workflow.api import ACW as api_ACW - monkeypatch.setattr("agentize.workflow.utils.acw.list_acw_providers", lambda: ["claude"]) + monkeypatch.setattr("agentize.workflow.api.acw.list_acw_providers", lambda: ["claude"]) with pytest.raises(ValueError, match="provider"): - utils_ACW(name="test", provider="codex", model="gpt") + api_ACW(name="test", provider="codex", model="gpt") @pytest.mark.skipif(ACW is None, reason="Implementation not yet available") def test_custom_runner_invoked(self, monkeypatch, tmp_path: Path): """ACW with custom runner invokes the custom runner, not run_acw.""" - from agentize.workflow.utils import ACW as utils_ACW + from agentize.workflow.api import ACW as api_ACW invocations = [] @@ -384,13 +384,13 @@ def _custom_runner( invocations.append({"provider": provider, "model": model}) return subprocess.CompletedProcess(args=["custom"], returncode=0) - monkeypatch.setattr("agentize.workflow.utils.acw.list_acw_providers", lambda: ["claude"]) + monkeypatch.setattr("agentize.workflow.api.acw.list_acw_providers", lambda: ["claude"]) input_path = tmp_path / "input.md" output_path = tmp_path / "output.md" input_path.write_text("prompt") - runner = utils_ACW( + runner = api_ACW( name="test", provider="claude", model="sonnet", @@ -406,7 +406,7 @@ def _custom_runner( @pytest.mark.skipif(ACW is None, reason="Implementation not yet available") def test_run_logs_and_invokes_acw(self, monkeypatch, tmp_path: Path): """ACW.run logs start/finish lines and calls run_acw with expected args.""" - from agentize.workflow.utils import ACW as utils_ACW + from agentize.workflow.api import ACW as api_ACW invocations = [] @@ -438,9 +438,9 @@ def _fake_run_acw( def _fake_time() -> float: return times.pop(0) - monkeypatch.setattr("agentize.workflow.utils.acw.list_acw_providers", lambda: ["claude"]) - monkeypatch.setattr("agentize.workflow.utils.acw.run_acw", _fake_run_acw) - monkeypatch.setattr("agentize.workflow.utils.acw.time.time", _fake_time) + monkeypatch.setattr("agentize.workflow.api.acw.list_acw_providers", lambda: ["claude"]) + monkeypatch.setattr("agentize.workflow.api.acw.run_acw", _fake_run_acw) + monkeypatch.setattr("agentize.workflow.api.acw.time.time", _fake_time) logs: list[str] = [] log_writer = logs.append @@ -449,7 +449,7 @@ def _fake_time() -> float: output_path = tmp_path / "output.md" input_path.write_text("prompt") - runner = utils_ACW( + runner = api_ACW( name="understander", provider="claude", model="sonnet", @@ -517,10 +517,10 @@ def test_workflow_backward_compat_imports(self): assert StageResult is not None assert run_acw is not None - @pytest.mark.skipif(utils_run_acw is None, reason="Implementation not yet available") - def test_utils_direct_imports(self): - """Imports from agentize.workflow.utils work.""" - from agentize.workflow.utils import run_acw + @pytest.mark.skipif(api_run_acw is None, reason="Implementation not yet available") + def test_api_direct_imports(self): + """Imports from agentize.workflow.api work.""" + from agentize.workflow.api import run_acw assert run_acw is not None @pytest.mark.skipif(planner_run_pipeline is None, reason="Implementation not yet available") @@ -530,12 +530,12 @@ def test_planner_package_imports(self): assert run_planner_pipeline is not None assert StageResult is not None - @pytest.mark.skipif(run_acw is None or utils_run_acw is None, reason="Implementation not yet available") + @pytest.mark.skipif(run_acw is None or api_run_acw is None, reason="Implementation not yet available") def test_run_acw_same_function(self): - """run_acw from workflow and workflow.utils is the same function.""" + """run_acw from workflow and workflow.api is the same function.""" from agentize.workflow import run_acw as workflow_run_acw - from agentize.workflow.utils import run_acw as utils_run_acw - assert workflow_run_acw is utils_run_acw + from agentize.workflow.api import run_acw as api_run_acw + assert workflow_run_acw is api_run_acw @pytest.mark.skipif(run_planner_pipeline is None or planner_run_pipeline is None, reason="Implementation not yet available") def test_run_planner_pipeline_same_function(self): diff --git a/python/tests/test_workflow_session.py b/python/tests/test_workflow_session.py new file mode 100644 index 0000000..64ac83a --- /dev/null +++ b/python/tests/test_workflow_session.py @@ -0,0 +1,129 @@ +"""Tests for the Session DSL in agentize.workflow.api.""" + +import subprocess +import threading +from pathlib import Path + +import pytest + +try: + from agentize.workflow.api import PipelineError, Session, StageResult +except ImportError: + PipelineError = None + Session = None + StageResult = None + + +@pytest.mark.skipif(Session is None, reason="Implementation not yet available") +def test_run_prompt_retries_on_missing_output(tmp_path: Path): + """run_prompt retries when output is missing and succeeds on retry.""" + calls: list[dict[str, str]] = [] + + def _runner( + provider: str, + model: str, + input_file: str | Path, + output_file: str | Path, + *, + tools: str | None = None, + permission_mode: str | None = None, + extra_flags: list[str] | None = None, + timeout: int = 900, + ) -> subprocess.CompletedProcess: + calls.append({"provider": provider, "model": model}) + if len(calls) == 1: + return subprocess.CompletedProcess(args=["stub"], returncode=0) + Path(output_file).write_text("ok") + return subprocess.CompletedProcess(args=["stub"], returncode=0) + + session = Session(output_dir=tmp_path, prefix="retry", runner=_runner) + result = session.run_prompt( + "stage", + "hello", + ("claude", "sonnet"), + retry=1, + retry_delay=0, + ) + + assert result.output_path.exists() + assert result.output_path.read_text().strip() == "ok" + assert len(calls) == 2 + + +@pytest.mark.skipif(PipelineError is None or Session is None, reason="Implementation not yet available") +def test_run_prompt_raises_after_retries(tmp_path: Path): + """run_prompt raises PipelineError after exhausting retries.""" + def _runner( + provider: str, + model: str, + input_file: str | Path, + output_file: str | Path, + *, + tools: str | None = None, + permission_mode: str | None = None, + extra_flags: list[str] | None = None, + timeout: int = 900, + ) -> subprocess.CompletedProcess: + return subprocess.CompletedProcess(args=["stub"], returncode=1) + + session = Session(output_dir=tmp_path, prefix="fail", runner=_runner) + + with pytest.raises(PipelineError): + session.run_prompt( + "stage", + "hello", + ("claude", "sonnet"), + retry=1, + retry_delay=0, + ) + + +@pytest.mark.skipif(StageResult is None, reason="Implementation not yet available") +def test_stage_result_text_reads_output(tmp_path: Path): + """StageResult.text returns the output file contents.""" + output_path = tmp_path / "output.md" + output_path.write_text("content") + result = StageResult( + stage="test", + input_path=tmp_path / "input.md", + output_path=output_path, + process=subprocess.CompletedProcess(args=[], returncode=0), + ) + + assert result.text() == "content" + + +@pytest.mark.skipif(Session is None, reason="Implementation not yet available") +def test_run_parallel_returns_mapping(tmp_path: Path): + """run_parallel returns results keyed by stage name.""" + lock = threading.Lock() + seen: list[str] = [] + + def _runner( + provider: str, + model: str, + input_file: str | Path, + output_file: str | Path, + *, + tools: str | None = None, + permission_mode: str | None = None, + extra_flags: list[str] | None = None, + timeout: int = 900, + ) -> subprocess.CompletedProcess: + with lock: + seen.append(str(output_file)) + Path(output_file).write_text(f"done:{Path(output_file).name}") + return subprocess.CompletedProcess(args=["stub"], returncode=0) + + session = Session(output_dir=tmp_path, prefix="parallel", runner=_runner) + calls = [ + session.stage("critique", "prompt A", ("claude", "opus")), + session.stage("reducer", "prompt B", ("claude", "opus")), + ] + + results = session.run_parallel(calls, max_workers=2) + + assert set(results.keys()) == {"critique", "reducer"} + assert results["critique"].text().startswith("done:") + assert results["reducer"].text().startswith("done:") + assert len(seen) == 2