diff --git a/.clawmetry-metrics.json b/.clawmetry-metrics.json new file mode 100644 index 0000000..4d1ae04 --- /dev/null +++ b/.clawmetry-metrics.json @@ -0,0 +1 @@ +{"tokens": [], "cost": [], "runs": [], "messages": [], "webhooks": [], "queues": [], "_last_received": 0, "_saved_at": 1775159858.248487} \ No newline at end of file diff --git a/.gitignore b/.gitignore index 22fafbd..be78ef6 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,4 @@ Thumbs.db # Logs *.log +.clawmetry-fleet.db diff --git a/clawmetry/sync.py b/clawmetry/sync.py index 7fe5470..d3c2365 100644 --- a/clawmetry/sync.py +++ b/clawmetry/sync.py @@ -36,18 +36,19 @@ def _pid_file() -> Path: def _acquire_pid_lock() -> bool: - """Write PID file. Return False if another instance is already running.""" + """Write PID file atomically. Return False if another instance is already running.""" pid_path = _pid_file() pid_path.parent.mkdir(parents=True, exist_ok=True) - if pid_path.exists(): + flags = os.O_CREAT | os.O_EXCL | os.O_WRONLY + try: + fd = os.open(str(pid_path), flags, 0o644) try: - existing_pid = int(pid_path.read_text().strip()) - os.kill(existing_pid, 0) - return False - except (ProcessLookupError, ValueError): - pass - pid_path.write_text(str(os.getpid())) - return True + os.write(fd, str(os.getpid()).encode()) + finally: + os.close(fd) + return True + except FileExistsError: + return False def _release_pid_lock() -> None: @@ -955,7 +956,10 @@ def sync_sessions_recent( if os.path.isfile(index_path): try: current_mtime = os.path.getmtime(index_path) - if _sessions_json_cache["data"] is not None and _sessions_json_cache["mtime"] == current_mtime: + if ( + _sessions_json_cache["data"] is not None + and _sessions_json_cache["mtime"] == current_mtime + ): file_to_subagent_id = _sessions_json_cache["data"] else: with open(index_path) as _fi: @@ -964,8 +968,14 @@ def sync_sessions_recent( if ":subagent:" in _k and isinstance(_meta, dict): _sf = _meta.get("sessionFile", "") if _sf: - file_to_subagent_id[os.path.basename(_sf)] = _k.split(":")[-1] - _sessions_json_cache = {"ts": time.time(), "data": file_to_subagent_id.copy(), "mtime": current_mtime} + file_to_subagent_id[os.path.basename(_sf)] = _k.split(":")[ + -1 + ] + _sessions_json_cache = { + "ts": time.time(), + "data": file_to_subagent_id.copy(), + "mtime": current_mtime, + } except Exception: pass @@ -2976,8 +2986,6 @@ def _build_gateway_data(paths: dict = None) -> dict: time.sleep(15) - - def run_daemon() -> None: """Run the sync daemon - main loop for continuous synchronization.""" config = load_config() diff --git a/tests/test_pid_lock.py b/tests/test_pid_lock.py new file mode 100644 index 0000000..283857b --- /dev/null +++ b/tests/test_pid_lock.py @@ -0,0 +1,297 @@ +""" +Test PID lock race condition in sync.py _acquire_pid_lock() + +This test demonstrates the TOCTOU race: between pid_path.exists() check +and pid_path.write_text(), another process can create the file. +""" + +import os +import sys +import time +import signal +import tempfile +import multiprocessing +from pathlib import Path + +import pytest + + +def _acquire_pid_lock_vulnerable(pid_path_str: str) -> bool: + """Vulnerable version that demonstrates the TOCTOU race.""" + from pathlib import Path + + pid_path = Path(pid_path_str) + pid_path.parent.mkdir(parents=True, exist_ok=True) + if pid_path.exists(): # TOCTOU: check + try: + existing_pid = int(pid_path.read_text().strip()) + os.kill(existing_pid, 0) + return False + except (ProcessLookupError, ValueError): + pass + # Race window here: another process can create the file between the check and write + time.sleep(0.1) # Simulates the race window + pid_path.write_text(str(os.getpid())) + return True + + +def _acquire_pid_lock_fixed(pid_path_str: str) -> bool: + """Fixed version using O_CREAT|O_EXCL for atomic creation.""" + pid_path = Path(pid_path_str) + pid_path.parent.mkdir(parents=True, exist_ok=True) + flags = os.O_CREAT | os.O_EXCL | os.O_WRONLY + try: + fd = os.open(str(pid_path), flags, 0o644) + os.write(fd, str(os.getpid()).encode()) + os.close(fd) + return True + except FileExistsError: + return False + + +def worker_vulnerable(ready_event, start_event, pid_path_str, results, idx): + """Worker process using vulnerable lock.""" + ready_event.set() + start_event.wait() + result = _acquire_pid_lock_vulnerable(pid_path_str) + results[idx] = result + + +def worker_fixed(ready_event, start_event, pid_path_str, results, idx): + """Worker process using fixed lock.""" + ready_event.set() + start_event.wait() + result = _acquire_pid_lock_fixed(pid_path_str) + results[idx] = result + + +def worker_actual_sync( + ready_event, start_event, pid_path_str, results, idx, original_pid_file_func +): + """Worker process using the actual _acquire_pid_lock from sync module.""" + import clawmetry.sync as sync + + # Monkeypatch the pid file path + original = sync._pid_file + sync._pid_file = lambda: Path(pid_path_str) + + ready_event.set() + start_event.wait() + + result = sync._acquire_pid_lock() + results[idx] = result + + # Restore + sync._pid_file = original + + +def test_pid_lock_vulnerable_implementation_has_race(tmp_path): + """ + Test that demonstrates the race: two processes both succeed with vulnerable lock. + + The vulnerable implementation has a TOCTOU race between exists() check and write_text(). + This test asserts that the race CAN occur (at least once in many trials). + """ + pid_path = tmp_path / "sync.pid" + pid_path_str = str(pid_path) + + num_trials = 50 + race_detected = 0 + + for trial in range(num_trials): + # Reset state + if pid_path.exists(): + pid_path.unlink() + + manager = multiprocessing.Manager() + results = manager.list([None, None]) + + ready_events = [multiprocessing.Event() for _ in range(2)] + start_event = multiprocessing.Event() + + # Two processes trying to acquire the lock simultaneously + p1 = multiprocessing.Process( + target=worker_vulnerable, + args=(ready_events[0], start_event, pid_path_str, results, 0), + ) + p2 = multiprocessing.Process( + target=worker_vulnerable, + args=(ready_events[1], start_event, pid_path_str, results, 1), + ) + + p1.start() + p2.start() + + # Wait for both to be ready + for e in ready_events: + e.wait() + + # Signal both to start at the same time + start_event.set() + + p1.join(timeout=5) + p2.join(timeout=5) + + # Count how many succeeded + successes = sum(1 for r in results if r is True) + + # With the race, both processes might succeed (both get the lock) + if successes == 2: + race_detected += 1 + + p1.terminate() + p2.terminate() + + # The race should be detected at least once in 50 trials + # This assertion verifies the vulnerable implementation is indeed vulnerable + assert race_detected > 0, ( + f"Vulnerable implementation race not detected in {num_trials} trials" + ) + + +def test_pid_lock_fixed_implementation_prevents_race(tmp_path): + """ + Test that the fixed implementation using O_CREAT|O_EXCL prevents the race. + Only one process should ever succeed in acquiring the lock. + """ + pid_path = tmp_path / "sync.pid" + pid_path_str = str(pid_path) + + num_trials = 50 + both_succeeded = 0 + + for trial in range(num_trials): + # Reset state + if pid_path.exists(): + pid_path.unlink() + + manager = multiprocessing.Manager() + results = manager.list([None, None]) + + ready_events = [multiprocessing.Event() for _ in range(2)] + start_event = multiprocessing.Event() + + # Two processes trying to acquire the lock simultaneously + p1 = multiprocessing.Process( + target=worker_fixed, + args=(ready_events[0], start_event, pid_path_str, results, 0), + ) + p2 = multiprocessing.Process( + target=worker_fixed, + args=(ready_events[1], start_event, pid_path_str, results, 1), + ) + + p1.start() + p2.start() + + # Wait for both to be ready + for e in ready_events: + e.wait() + + # Signal both to start at the same time + start_event.set() + + p1.join(timeout=5) + p2.join(timeout=5) + + # Count how many succeeded + successes = sum(1 for r in results if r is True) + + # With the fix, exactly one should succeed + if successes == 2: + both_succeeded += 1 + + p1.terminate() + p2.terminate() + + # The fixed implementation should never allow both to succeed + assert both_succeeded == 0, ( + f"Race condition still present: {both_succeeded}/{num_trials} trials " + "had both processes succeed" + ) + + +def test_sync_pid_lock_has_race_condition(tmp_path): + """ + Test that the actual _acquire_pid_lock in sync.py has the TOCTOU race. + + This test FAILS with the vulnerable code and PASSES after the fix. + It verifies that when two processes race, the atomic O_CREAT|O_EXCL + is used to prevent both from acquiring the lock. + """ + import clawmetry.sync as sync + + pid_path = tmp_path / "sync.pid" + pid_path_str = str(pid_path) + + num_trials = 50 + both_succeeded = 0 + + for trial in range(num_trials): + # Reset state + if pid_path.exists(): + pid_path.unlink() + + manager = multiprocessing.Manager() + results = manager.list([None, None]) + + ready_events = [multiprocessing.Event() for _ in range(2)] + start_event = multiprocessing.Event() + + # Two processes trying to acquire the lock simultaneously + p1 = multiprocessing.Process( + target=worker_actual_sync, + args=( + ready_events[0], + start_event, + pid_path_str, + results, + 0, + sync._pid_file, + ), + ) + p2 = multiprocessing.Process( + target=worker_actual_sync, + args=( + ready_events[1], + start_event, + pid_path_str, + results, + 1, + sync._pid_file, + ), + ) + + p1.start() + p2.start() + + # Wait for both to be ready + for e in ready_events: + e.wait() + + # Signal both to start at the same time + start_event.set() + + p1.join(timeout=5) + p2.join(timeout=5) + + # Count how many succeeded + successes = sum(1 for r in results if r is True) + + if successes == 2: + both_succeeded += 1 + + p1.terminate() + p2.terminate() + + # Clean up PID file + if pid_path.exists(): + pid_path.unlink() + + # After fix: both processes should NEVER succeed simultaneously + # Before fix: the race allows both to succeed + assert both_succeeded == 0, ( + f"TOCTOU race detected in sync.py:_acquire_pid_lock: " + f"{both_succeeded}/{num_trials} trials had both processes succeed. " + "The function must use O_CREAT|O_EXCL for atomic file creation." + )