Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 110 additions & 3 deletions api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,114 @@ def _profile_home_for_cron_job(job: dict):
return get_hermes_home_for_profile(raw)


def _cron_job_subprocess_main(job, execution_profile_home, result_queue):
"""Run one cron job inside a child process pinned to a profile home."""
try:
def _run():
from cron.scheduler import run_job

return run_job(job)

if execution_profile_home is None:
result = _run()
else:
from api.profiles import cron_profile_context_for_home

with cron_profile_context_for_home(execution_profile_home):
result = _run()
result_queue.put(("ok", result))
except BaseException as exc: # pragma: no cover - surfaced in parent
import traceback

result_queue.put(("error", f"{type(exc).__name__}: {exc}", traceback.format_exc()))


def _cron_subprocess_result_timeout_seconds(job):
"""Return how long the manual-run parent waits for child result payloads."""
for key in ("timeout_seconds", "max_runtime_seconds", "timeout"):
raw = (job or {}).get(key)
if raw in (None, ""):
continue
try:
value = float(raw)
except (TypeError, ValueError):
continue
if value > 0:
return max(60.0, value + 30.0)
# Manual cron jobs can legitimately run for a long time. Keep a recovery
# path for wedged children without truncating normal long-running jobs.
return 6 * 60 * 60.0


def _run_cron_job_in_profile_subprocess(job, execution_profile_home):
"""Execute cron.scheduler.run_job without holding the parent cron env lock.

cron.scheduler/cron.jobs still rely on process-global HERMES_HOME and module
constants, so running the job body in a child process gives each long cron
execution its own globals. The parent process only uses cron_profile_context
for short metadata reads/writes and remains responsive to unrelated cron UI
and API calls while the job runs.
"""
import multiprocessing
import queue

ctx = multiprocessing.get_context("fork")
result_queue = ctx.Queue(maxsize=1)
process = ctx.Process(
target=_cron_job_subprocess_main,
args=(job, execution_profile_home, result_queue),
)
process.start()

result_timeout = _cron_subprocess_result_timeout_seconds(job)
status = "error"
payload = ["cron run subprocess failed before producing a result", ""]
try:
try:
# Drain the potentially large pickled result before joining. If the
# child puts >~64 KiB on a multiprocessing.Queue, joining first can
# deadlock while the child's feeder thread waits for the parent to
# read from the pipe.
status, *payload = result_queue.get(timeout=result_timeout)
except queue.Empty:
status = "error"
if process.is_alive():
process.terminate()
process.join(timeout=5)
payload = [
f"cron run subprocess produced no result within {result_timeout:g}s and was terminated",
"",
]
else:
payload = [
f"cron run subprocess exited with code {process.exitcode} without producing a result",
"",
]
finally:
process.join(timeout=5)
if process.is_alive():
process.terminate()
process.join(timeout=5)
if status == "ok":
status = "error"
payload = [
"cron run subprocess did not exit after returning a result",
"",
]
finally:
result_queue.close()
result_queue.join_thread()

if status == "ok":
return payload[0]

message = payload[0]
traceback_text = payload[1] if len(payload) > 1 else ""
if traceback_text:
logger.error("Manual cron subprocess failed:\n%s", traceback_text)
raise RuntimeError(message)


def _run_cron_tracked(job, profile_home=None, execution_profile_home=None):
"""Wrapper that tracks running state around cron.scheduler.run_job.

Expand All @@ -321,7 +429,6 @@ def _run_cron_tracked(job, profile_home=None, execution_profile_home=None):
agent config/.env while running. When no job profile is selected, both homes
are the same and legacy server-default behavior is preserved.
"""
from cron.scheduler import run_job # import here — runs inside a worker thread
from cron.jobs import mark_job_run, save_job_output

job_id = job.get("id", "")
Expand All @@ -336,8 +443,8 @@ def _with_cron_home(home, fn):
return fn()

try:
success, output, final_response, error = _with_cron_home(
execution_profile_home, lambda: run_job(job)
success, output, final_response, error = _run_cron_job_in_profile_subprocess(
job, execution_profile_home
)

# Persist output and run metadata back to the job's owning cron store,
Expand Down
15 changes: 10 additions & 5 deletions tests/test_cron_run_job_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ class TestRunCronTrackedImport:
"""_run_cron_tracked must be self-contained — it runs in a worker thread."""

def test_run_job_imported_inside_function(self):
"""run_job must be imported inside _run_cron_tracked, not relied on
"""run_job must be imported inside the subprocess target, not relied on
from a caller's local scope."""
src = _get_function_source("_run_cron_tracked")
src = _get_function_source("_cron_job_subprocess_main")
tree = ast.parse(src)
names_used = set()

Expand Down Expand Up @@ -86,7 +86,12 @@ def test_handle_cron_run_does_not_import_run_job(self):
"_run_cron_tracked to avoid the NameError in worker threads."
)

def test_run_cron_tracked_calls_run_job(self):
"""Sanity: the function still actually calls run_job."""
def test_run_cron_tracked_calls_run_job_helper(self):
"""Sanity: the function still delegates to the cron job runner."""
src = _get_function_source("_run_cron_tracked")
assert "run_job" in src, "_run_cron_tracked should call run_job"
assert "_run_cron_job_in_profile_subprocess" in src

def test_cron_subprocess_target_calls_run_job(self):
"""Sanity: the subprocess target still actually calls run_job."""
src = _get_function_source("_cron_job_subprocess_main")
assert "run_job" in src, "cron subprocess target should call run_job"
193 changes: 193 additions & 0 deletions tests/test_issue1574_cron_profile_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
import multiprocessing
import sys
import threading
import types
from pathlib import Path


def _install_fake_cron(monkeypatch, run_job, events):
cron_pkg = types.ModuleType("cron")
cron_pkg.__path__ = []

cron_jobs = types.ModuleType("cron.jobs")
cron_jobs.HERMES_DIR = Path("/tmp/hermes")
cron_jobs.CRON_DIR = cron_jobs.HERMES_DIR / "cron"
cron_jobs.JOBS_FILE = cron_jobs.CRON_DIR / "jobs.json"
cron_jobs.OUTPUT_DIR = cron_jobs.CRON_DIR / "output"
cron_jobs.save_job_output = lambda job_id, output: events.append(("save", job_id, output))
cron_jobs.mark_job_run = lambda job_id, success, error=None: events.append(("mark", job_id, success, error))

cron_scheduler = types.ModuleType("cron.scheduler")
cron_scheduler._hermes_home = Path("/tmp/hermes")
cron_scheduler._LOCK_DIR = cron_scheduler._hermes_home / "cron"
cron_scheduler._LOCK_FILE = cron_scheduler._LOCK_DIR / ".tick.lock"
cron_scheduler.run_job = run_job

monkeypatch.setitem(sys.modules, "cron", cron_pkg)
monkeypatch.setitem(sys.modules, "cron.jobs", cron_jobs)
monkeypatch.setitem(sys.modules, "cron.scheduler", cron_scheduler)
return cron_jobs, cron_scheduler


def _write_fake_large_payload_cron_package(root: Path):
cron_dir = root / "cron"
cron_dir.mkdir(parents=True)
(cron_dir / "__init__.py").write_text("", encoding="utf-8")
(cron_dir / "jobs.py").write_text(
"from pathlib import Path\n"
"HERMES_DIR = Path('/tmp/hermes')\n"
"CRON_DIR = HERMES_DIR / 'cron'\n"
"JOBS_FILE = CRON_DIR / 'jobs.json'\n"
"OUTPUT_DIR = CRON_DIR / 'output'\n",
encoding="utf-8",
)
(cron_dir / "scheduler.py").write_text(
"from pathlib import Path\n"
"_hermes_home = Path('/tmp/hermes')\n"
"_LOCK_DIR = _hermes_home / 'cron'\n"
"_LOCK_FILE = _LOCK_DIR / '.tick.lock'\n"
"def run_job(job):\n"
" payload = 'x' * 200_000\n"
" return True, payload, payload, None\n",
encoding="utf-8",
)


def _large_cron_payload_runner(fake_pkg_root, profile_home, result_queue):
try:
import api.routes as routes

# api.routes/config may prepend the real hermes-agent path while importing.
# Re-prepend the fake cron package afterward and clear any already-loaded
# cron modules so the helper's child process imports the large-payload fake.
sys.path.insert(0, str(fake_pkg_root))
for module_name in ("cron.scheduler", "cron.jobs", "cron"):
sys.modules.pop(module_name, None)

success, output, final_response, error = routes._run_cron_job_in_profile_subprocess(
{"id": "large-payload"}, Path(profile_home)
)
result_queue.put(("ok", success, len(output), len(final_response), error))
except BaseException as exc: # pragma: no cover - surfaced in parent process
import traceback

result_queue.put(("error", repr(exc), traceback.format_exc()))


def test_manual_cron_subprocess_drains_large_result_before_join(tmp_path):
"""A >100 KB result must not deadlock the parent before it can persist output."""
fake_pkg_root = tmp_path / "fake-cron-pkg"
_write_fake_large_payload_cron_package(fake_pkg_root)

# Use fork only for the outer test harness so this pytest module does not
# need to be importable as a package. The product helper under test owns its
# own multiprocessing context.
ctx = multiprocessing.get_context("fork")
result_queue = ctx.Queue()
runner = ctx.Process(
target=_large_cron_payload_runner,
args=(fake_pkg_root, tmp_path / "exec-profile", result_queue),
)
runner.start()
runner.join(10)
if runner.is_alive():
runner.terminate()
runner.join(5)
result_queue.close()
result_queue.join_thread()
raise AssertionError(
"manual cron subprocess deadlocked on a >100 KB Queue payload; "
"the parent must drain result_queue before process.join()"
)

try:
result = result_queue.get(timeout=2)
finally:
result_queue.close()
result_queue.join_thread()
assert result == ("ok", True, 200_000, 200_000, None)


def test_manual_cron_run_does_not_hold_profile_lock_for_job_duration(tmp_path, monkeypatch):
"""A long manual run must not freeze unrelated cron/profile operations.

The parent WebUI process still needs the cron profile lock for short metadata
writes, but the potentially minutes-long run_job body should execute outside
that process-wide critical section.
"""
import api.routes as routes
from api.profiles import cron_profile_context_for_home

events = []
run_started = threading.Event()
release_run = threading.Event()

def fake_run_job_subprocess(job, execution_profile_home):
events.append(("run", job["id"], str(execution_profile_home)))
run_started.set()
assert release_run.wait(2), "test timed out waiting to release fake cron run"
return True, "output", "final", None

_install_fake_cron(monkeypatch, lambda job: (True, "unused", "unused", None), events)
monkeypatch.setattr(routes, "_run_cron_job_in_profile_subprocess", fake_run_job_subprocess)

job_home = tmp_path / "owner"
exec_home = tmp_path / "exec"
other_home = tmp_path / "other"

routes._mark_cron_running("job1574")
worker = threading.Thread(
target=routes._run_cron_tracked,
args=({"id": "job1574"}, job_home, exec_home),
)
worker.start()
assert run_started.wait(2), "fake run_job did not start"

contender_entered = threading.Event()

def contender():
with cron_profile_context_for_home(other_home):
events.append(("contender", str(other_home)))
contender_entered.set()

contender_thread = threading.Thread(target=contender)
contender_thread.start()

assert contender_entered.wait(0.5), (
"cron_profile_context_for_home stayed blocked while run_job was active; "
"the global cron profile lock is still held for the full job duration"
)

release_run.set()
worker.join(2)
contender_thread.join(2)

assert not worker.is_alive()
assert not contender_thread.is_alive()
assert ("run", "job1574", str(exec_home)) in events
assert ("save", "job1574", "output") in events
assert ("mark", "job1574", True, None) in events
assert routes._is_cron_running("job1574") == (False, 0.0)


def test_cron_job_subprocess_executes_under_selected_profile_home(tmp_path, monkeypatch):
import api.routes as routes

def fake_run_job(job):
import cron.scheduler as scheduler

return True, str(scheduler._hermes_home), "final", None

events = []
_, cron_scheduler = _install_fake_cron(monkeypatch, fake_run_job, events)
exec_home = tmp_path / "exec-profile"

success, output, final_response, error = routes._run_cron_job_in_profile_subprocess(
{"id": "job1574"}, exec_home
)

assert success is True
assert output == str(exec_home)
assert final_response == "final"
assert error is None
assert cron_scheduler._hermes_home == Path("/tmp/hermes")
9 changes: 6 additions & 3 deletions tests/test_issue617_cron_profile_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,12 @@ def __exit__(self, exc_type, exc, tb):
cron_scheduler = types.ModuleType("cron.scheduler")
cron_scheduler.run_job = lambda job: events.append(("run", job["id"])) or (True, "output", "final", None)

def fake_subprocess_run(job, execution_profile_home):
events.append(("run", job["id"], str(execution_profile_home)))
return True, "output", "final", None

monkeypatch.setattr(profiles, "cron_profile_context_for_home", Ctx)
monkeypatch.setattr(routes, "_run_cron_job_in_profile_subprocess", fake_subprocess_run)
monkeypatch.setitem(sys.modules, "cron", cron_pkg)
monkeypatch.setitem(sys.modules, "cron.jobs", cron_jobs)
monkeypatch.setitem(sys.modules, "cron.scheduler", cron_scheduler)
Expand All @@ -197,9 +202,7 @@ def __exit__(self, exc_type, exc, tb):
)

assert events == [
("enter", "/hermes/profiles/research"),
("run", "job617"),
("exit", "/hermes/profiles/research"),
("run", "job617", "/hermes/profiles/research"),
("enter", "/hermes/default"),
("save", "job617", "output"),
("mark", "job617", True, None),
Expand Down
Loading
Loading