Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion researchclaw/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,15 @@ def cmd_run(args: argparse.Namespace) -> int:
)

done = sum(1 for r in results if r.status.value == "done")
paused = sum(1 for r in results if r.status.value == "paused")
failed = sum(1 for r in results if r.status.value == "failed")
print(f"\nPipeline complete: {done}/{len(results)} stages done, {failed} failed")
if paused:
print(
f"\nPipeline paused: {done}/{len(results)} stages done, "
f"{paused} paused, {failed} failed"
)
else:
print(f"\nPipeline complete: {done}/{len(results)} stages done, {failed} failed")
return 0 if failed == 0 else 1


Expand Down
7 changes: 6 additions & 1 deletion researchclaw/pipeline/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,12 @@ def _build_fallback_queries(topic: str) -> list[str]:
def _write_stage_meta(
stage_dir: Path, stage: Stage, run_id: str, result: "StageResult"
) -> None:
next_stage = NEXT_STAGE[stage]
if result.status is StageStatus.DONE:
next_stage = NEXT_STAGE[stage]
else:
# Failed / paused / blocked stages should point back to themselves so
# retry-resume tooling does not imply that the pipeline advanced.
next_stage = stage
meta = {
"stage_id": f"{int(stage):02d}-{stage.name.lower()}",
"run_id": run_id,
Expand Down
14 changes: 14 additions & 0 deletions researchclaw/pipeline/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ def _build_pipeline_summary(
"run_id": run_id,
"stages_executed": len(results),
"stages_done": sum(1 for item in results if item.status == StageStatus.DONE),
"stages_paused": sum(
1 for item in results if item.status == StageStatus.PAUSED
),
"stages_blocked": sum(
1 for item in results if item.status == StageStatus.BLOCKED_APPROVAL
),
Expand Down Expand Up @@ -465,6 +468,9 @@ def execute_pipeline(
print(f"{prefix} {stage.name} — FAILED ({elapsed:.1f}s) — {err}")
elif result.status == StageStatus.BLOCKED_APPROVAL:
print(f"{prefix} {stage.name} — blocked (awaiting approval)")
elif result.status == StageStatus.PAUSED:
err = result.error or "paused"
print(f"{prefix} {stage.name} бк PAUSED ({elapsed:.1f}s) бк {err}")
results.append(result)

if kb_root is not None and result.status == StageStatus.DONE:
Expand Down Expand Up @@ -604,6 +610,14 @@ def execute_pipeline(
logger.warning("Noncritical stage %s failed - skipping", stage.name)
else:
break
if result.status == StageStatus.PAUSED:
logger.warning(
"[%s] Pipeline paused at %s: %s",
run_id,
stage.name,
result.error or result.decision,
)
break
if result.status == StageStatus.BLOCKED_APPROVAL and stop_on_gate:
break

Expand Down
98 changes: 84 additions & 14 deletions researchclaw/pipeline/stage_impls/_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,41 @@ def _files_to_context(project_files: dict[str, str]) -> str:
parts.append(f"```filename:{fname}\n{code}\n```")
return "\n\n".join(parts)

def _write_refinement_log() -> None:
(stage_dir / "refinement_log.json").write_text(
json.dumps(log, indent=2), encoding="utf-8"
)

def _pause_refinement(
*,
reason: str,
stop_reason: str,
iteration: int | None = None,
) -> StageResult:
log.update(
{
"paused": True,
"converged": False,
"stop_reason": stop_reason,
"pause_reason": reason,
"best_metric": best_metric,
"best_version": best_version,
"iterations_completed": len(log["iterations"]),
}
)
if iteration is not None:
log["pause_iteration"] = iteration
_write_refinement_log()
artifacts = ("refinement_log.json",)
return StageResult(
stage=Stage.ITERATIVE_REFINE,
status=StageStatus.PAUSED,
artifacts=artifacts,
error=reason,
decision="resume",
evidence_refs=tuple(f"stage-13/{a}" for a in artifacts),
)

if llm is None:
logger.info("Stage 13: LLM unavailable, saving original experiment as final")
final_dir = stage_dir / "experiment_final"
Expand All @@ -677,9 +712,7 @@ def _files_to_context(project_files: dict[str, str]) -> str:
],
}
)
(stage_dir / "refinement_log.json").write_text(
json.dumps(log, indent=2), encoding="utf-8"
)
_write_refinement_log()
artifacts = ("refinement_log.json", "experiment_final/")
return StageResult(
stage=Stage.ITERATIVE_REFINE,
Expand Down Expand Up @@ -803,12 +836,25 @@ def _files_to_context(project_files: dict[str, str]) -> str:
timeout_refine_attempts,
)

response = _chat_with_prompt(
llm,
ip.system,
user_prompt,
max_tokens=ip.max_tokens or 8192,
)
try:
response = _chat_with_prompt(
llm,
ip.system,
user_prompt,
max_tokens=ip.max_tokens or 8192,
)
except RuntimeError as exc:
if "ACP prompt timed out after" in str(exc):
logger.warning(
"Stage 13: ACP prompt timed out during iteration %d; pausing for resume",
iteration,
)
return _pause_refinement(
reason=str(exc),
stop_reason="acp_prompt_timeout",
iteration=iteration,
)
raise
extracted_files = _extract_multi_file_blocks(response.content)
# If LLM returns only single block, treat as main.py update
if not extracted_files:
Expand Down Expand Up @@ -865,7 +911,20 @@ def _files_to_context(project_files: dict[str, str]) -> str:
issue_text=issue_text,
all_files_ctx=_files_to_context(candidate_files),
)
repair_response = _chat_with_prompt(llm, irp.system, irp.user)
try:
repair_response = _chat_with_prompt(llm, irp.system, irp.user)
except RuntimeError as exc:
if "ACP prompt timed out after" in str(exc):
logger.warning(
"Stage 13: ACP repair prompt timed out during iteration %d; pausing for resume",
iteration,
)
return _pause_refinement(
reason=str(exc),
stop_reason="acp_prompt_timeout",
iteration=iteration,
)
raise
candidate_files["main.py"] = _extract_code_block(repair_response.content)
validation = validate_code(candidate_files["main.py"])
repaired = True
Expand Down Expand Up @@ -977,7 +1036,20 @@ def _files_to_context(project_files: dict[str, str]) -> str:
issue_text=runtime_issues,
all_files_ctx=_files_to_context(candidate_files),
)
repair_resp = _chat_with_prompt(llm, rrp.system, rrp.user)
try:
repair_resp = _chat_with_prompt(llm, rrp.system, rrp.user)
except RuntimeError as exc:
if "ACP prompt timed out after" in str(exc):
logger.warning(
"Stage 13: ACP runtime-repair prompt timed out during iteration %d; pausing for resume",
iteration,
)
return _pause_refinement(
reason=str(exc),
stop_reason="acp_prompt_timeout",
iteration=iteration,
)
raise
repaired_files = _extract_multi_file_blocks(repair_resp.content)
if not repaired_files:
single = _extract_code_block(repair_resp.content)
Expand Down Expand Up @@ -1067,9 +1139,7 @@ def _files_to_context(project_files: dict[str, str]) -> str:
)
if _all_ablation_identical:
log["ablation_identical_warning"] = True
(stage_dir / "refinement_log.json").write_text(
json.dumps(log, indent=2), encoding="utf-8"
)
_write_refinement_log()

artifacts = ["refinement_log.json", "experiment_final/"]
artifacts.extend(
Expand Down
51 changes: 51 additions & 0 deletions tests/test_rc_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

from researchclaw import cli as rc_cli
from researchclaw.config import resolve_config_path
from researchclaw.pipeline.executor import StageResult
from researchclaw.pipeline.stages import Stage, StageStatus


def _write_valid_config(path: Path) -> None:
Expand Down Expand Up @@ -100,6 +102,55 @@ def test_cmd_validate_valid_config_returns_zero(
assert "Config validation passed" in capsys.readouterr().out


def test_cmd_run_reports_paused_pipeline(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
capsys: pytest.CaptureFixture[str],
) -> None:
config_path = tmp_path / "config.yaml"
_write_valid_config(config_path)
output_dir = tmp_path / "artifacts" / "paused-run"

from researchclaw.pipeline import runner as rc_runner

monkeypatch.setattr(
rc_runner,
"execute_pipeline",
lambda **kwargs: [
StageResult(
stage=Stage.TOPIC_INIT,
status=StageStatus.DONE,
artifacts=("goal.md",),
),
StageResult(
stage=Stage.PROBLEM_DECOMPOSE,
status=StageStatus.PAUSED,
artifacts=("refinement_log.json",),
error="ACP prompt timed out after 1800s",
decision="resume",
),
],
)
monkeypatch.setattr(rc_runner, "read_checkpoint", lambda run_dir: None)

args = argparse.Namespace(
config=str(config_path),
topic=None,
output=str(output_dir),
from_stage=None,
auto_approve=False,
skip_preflight=True,
resume=False,
skip_noncritical_stage=False,
no_graceful_degradation=False,
)
code = rc_cli.cmd_run(args)
captured = capsys.readouterr()
assert code == 0
assert "Pipeline paused:" in captured.out
assert "1 paused" in captured.out


def test_main_dispatches_run_command(monkeypatch: pytest.MonkeyPatch) -> None:
captured = {}

Expand Down
62 changes: 62 additions & 0 deletions tests/test_rc_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,29 @@ def test_write_stage_meta_writes_expected_json(run_dir: Path) -> None:
assert re.match(r"\d{4}-\d{2}-\d{2}T", payload["ts"])


def test_write_stage_meta_keeps_paused_stage_as_next_stage(run_dir: Path) -> None:
stage_dir = run_dir / "stage-02"
stage_dir.mkdir()
result = rc_executor.StageResult(
stage=Stage.PROBLEM_DECOMPOSE,
status=StageStatus.PAUSED,
artifacts=("refinement_log.json",),
decision="resume",
error="ACP prompt timed out after 1800s",
evidence_refs=("stage-02/refinement_log.json",),
)
rc_executor._write_stage_meta(
stage_dir, Stage.PROBLEM_DECOMPOSE, "run-paused", result
)
payload = cast(
dict[str, Any],
json.loads((stage_dir / "decision.json").read_text(encoding="utf-8")),
)
assert payload["status"] == "paused"
assert payload["decision"] == "resume"
assert payload["next_stage"] == int(Stage.PROBLEM_DECOMPOSE)


def test_execute_stage_creates_stage_dir_writes_artifacts_and_meta(
monkeypatch: pytest.MonkeyPatch,
run_dir: Path,
Expand Down Expand Up @@ -751,6 +774,45 @@ def test_refine_no_llm_saves_original_as_final(
assert payload["stop_reason"] == "llm_unavailable"
assert result.status == StageStatus.DONE

def test_refine_acp_timeout_pauses_for_resume(
self,
run_dir: Path,
rc_config: RCConfig,
adapters: AdapterBundle,
monkeypatch: pytest.MonkeyPatch,
) -> None:
self._prepare_refine_inputs(run_dir)
stage_dir = run_dir / "stage-13"
stage_dir.mkdir(parents=True, exist_ok=True)

from researchclaw.pipeline.stage_impls import _execution as execution_impl

def _timeout(*args, **kwargs):
_ = args, kwargs
raise RuntimeError("ACP prompt timed out after 1800s")

monkeypatch.setattr(execution_impl, "_chat_with_prompt", _timeout)

result = rc_executor._execute_iterative_refine(
stage_dir,
run_dir,
rc_config,
adapters,
llm=FakeLLMClient("unused"),
)

payload = json.loads(
(stage_dir / "refinement_log.json").read_text(encoding="utf-8")
)
assert result.status == StageStatus.PAUSED
assert result.decision == "resume"
assert result.artifacts == ("refinement_log.json",)
assert payload["paused"] is True
assert payload["stop_reason"] == "acp_prompt_timeout"
assert payload["pause_iteration"] == 1
assert payload["best_version"] == "experiment/"
assert not (stage_dir / "experiment_final").exists()

def test_refine_with_llm_generates_improved_code(
self,
run_dir: Path,
Expand Down
Loading