diff --git a/clawmetry/sync.py b/clawmetry/sync.py index 7fe5470..816dfc9 100644 --- a/clawmetry/sync.py +++ b/clawmetry/sync.py @@ -96,6 +96,8 @@ def _validate_log_offsets(state: dict, paths: dict) -> None: STATE_FILE = CONFIG_DIR / "sync-state.json" LOG_FILE = CONFIG_DIR / "sync.log" +_state_lock = threading.RLock() + POLL_INTERVAL = 15 # seconds between sync cycles STREAM_INTERVAL = 2 # seconds between real-time stream pushes BATCH_SIZE = ( @@ -233,8 +235,19 @@ def load_state() -> dict: def save_state(state: dict) -> None: - CONFIG_DIR.mkdir(parents=True, exist_ok=True) - STATE_FILE.write_text(json.dumps(state, indent=2)) + with _state_lock: + CONFIG_DIR.mkdir(parents=True, exist_ok=True) + STATE_FILE.write_text(json.dumps(state, indent=2)) + + +def update_state(updater: callable): + """Atomically load state, apply updater callback, and save. Returns updater result.""" + with _state_lock: + state = load_state() + result = updater(state) + CONFIG_DIR.mkdir(parents=True, exist_ok=True) + STATE_FILE.write_text(json.dumps(state, indent=2)) + return result # ── HTTP ────────────────────────────────────────────────────────────────────── @@ -955,7 +968,10 @@ def sync_sessions_recent( if os.path.isfile(index_path): try: current_mtime = os.path.getmtime(index_path) - if _sessions_json_cache["data"] is not None and _sessions_json_cache["mtime"] == current_mtime: + if ( + _sessions_json_cache["data"] is not None + and _sessions_json_cache["mtime"] == current_mtime + ): file_to_subagent_id = _sessions_json_cache["data"] else: with open(index_path) as _fi: @@ -964,8 +980,14 @@ def sync_sessions_recent( if ":subagent:" in _k and isinstance(_meta, dict): _sf = _meta.get("sessionFile", "") if _sf: - file_to_subagent_id[os.path.basename(_sf)] = _k.split(":")[-1] - _sessions_json_cache = {"ts": time.time(), "data": file_to_subagent_id.copy(), "mtime": current_mtime} + file_to_subagent_id[os.path.basename(_sf)] = _k.split(":")[ + -1 + ] + _sessions_json_cache = { + "ts": time.time(), + "data": file_to_subagent_id.copy(), + "mtime": current_mtime, + } except Exception: pass @@ -2768,18 +2790,23 @@ def _backfill_worker(): time.sleep(20) log.info("Background backfill starting — syncing older sessions...") try: - bf_state = load_state() - ev = sync_sessions(config, bf_state, paths) - bf_state["initial_backfill_done"] = True - bf_state["last_sync"] = datetime.now(timezone.utc).isoformat() - save_state(bf_state) + + def _update_sessions(s): + ev = sync_sessions(config, s, paths) + s["initial_backfill_done"] = True + s["last_sync"] = datetime.now(timezone.utc).isoformat() + return ev + + ev = update_state(_update_sessions) log.info(f"Background backfill: {ev} older events synced") except Exception as e: log.warning(f"Background backfill error: {e}") try: - bf_state = load_state() - lg = sync_logs(config, bf_state, paths) - save_state(bf_state) + + def _update_logs(s): + return sync_logs(config, s, paths) + + lg = update_state(_update_logs) log.info(f"Background backfill: {lg} log lines synced") except Exception as e: log.warning(f"Background backfill log error: {e}") @@ -2976,8 +3003,6 @@ def _build_gateway_data(paths: dict = None) -> dict: time.sleep(15) - - def run_daemon() -> None: """Run the sync daemon - main loop for continuous synchronization.""" config = load_config() diff --git a/tests/test_sync.py b/tests/test_sync.py new file mode 100644 index 0000000..968f580 --- /dev/null +++ b/tests/test_sync.py @@ -0,0 +1,56 @@ +"""Tests for clawmetry.sync — state save race condition.""" + +import json +import threading + +import pytest + + +@pytest.fixture(autouse=True) +def temp_openclaw_dir(tmp_path, monkeypatch): + """Point OpenClaw dir to a temp directory so tests don't pollute real data.""" + monkeypatch.setenv("CLAWMETRY_OPENCLAW_DIR", str(tmp_path)) + return tmp_path + + +def _reload_sync(): + """Reload sync module to get fresh state after patching.""" + import importlib + from clawmetry import sync + + importlib.reload(sync) + return sync + + +class TestStateSaveRace: + """Test that concurrent state saves don't cause data corruption.""" + + def test_concurrent_atomic_updates_preserve_data(self, tmp_path, monkeypatch): + """ + Using atomic update_state(), concurrent updates are serialized + and no data is lost. + """ + sync = _reload_sync() + + state_file = tmp_path / "sync-state.json" + monkeypatch.setattr(sync, "STATE_FILE", state_file) + + state_file.write_text(json.dumps({"counter": 0})) + + def writer_thread(iterations=50): + for i in range(iterations): + sync.update_state(lambda s: s.update(counter=s.get("counter", 0) + 1)) + + t1 = threading.Thread(target=writer_thread, args=(50,)) + t2 = threading.Thread(target=writer_thread, args=(50,)) + + t1.start() + t2.start() + t1.join() + t2.join() + + final_state = json.loads(state_file.read_text()) + assert final_state["counter"] == 100, ( + f"Expected counter=100, got {final_state['counter']} - " + "atomic updates should prevent data loss" + )