Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 40 additions & 15 deletions clawmetry/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ def _validate_log_offsets(state: dict, paths: dict) -> None:
STATE_FILE = CONFIG_DIR / "sync-state.json"
LOG_FILE = CONFIG_DIR / "sync.log"

_state_lock = threading.RLock()

POLL_INTERVAL = 15 # seconds between sync cycles
STREAM_INTERVAL = 2 # seconds between real-time stream pushes
BATCH_SIZE = (
Expand Down Expand Up @@ -233,8 +235,19 @@ def load_state() -> dict:


def save_state(state: dict) -> None:
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
STATE_FILE.write_text(json.dumps(state, indent=2))
with _state_lock:
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
STATE_FILE.write_text(json.dumps(state, indent=2))


def update_state(updater: callable):
"""Atomically load state, apply updater callback, and save. Returns updater result."""
with _state_lock:
state = load_state()
result = updater(state)
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
STATE_FILE.write_text(json.dumps(state, indent=2))
return result


# ── HTTP ──────────────────────────────────────────────────────────────────────
Expand Down Expand Up @@ -955,7 +968,10 @@ def sync_sessions_recent(
if os.path.isfile(index_path):
try:
current_mtime = os.path.getmtime(index_path)
if _sessions_json_cache["data"] is not None and _sessions_json_cache["mtime"] == current_mtime:
if (
_sessions_json_cache["data"] is not None
and _sessions_json_cache["mtime"] == current_mtime
):
file_to_subagent_id = _sessions_json_cache["data"]
else:
with open(index_path) as _fi:
Expand All @@ -964,8 +980,14 @@ def sync_sessions_recent(
if ":subagent:" in _k and isinstance(_meta, dict):
_sf = _meta.get("sessionFile", "")
if _sf:
file_to_subagent_id[os.path.basename(_sf)] = _k.split(":")[-1]
_sessions_json_cache = {"ts": time.time(), "data": file_to_subagent_id.copy(), "mtime": current_mtime}
file_to_subagent_id[os.path.basename(_sf)] = _k.split(":")[
-1
]
_sessions_json_cache = {
"ts": time.time(),
"data": file_to_subagent_id.copy(),
"mtime": current_mtime,
}
except Exception:
pass

Expand Down Expand Up @@ -2768,18 +2790,23 @@ def _backfill_worker():
time.sleep(20)
log.info("Background backfill starting — syncing older sessions...")
try:
bf_state = load_state()
ev = sync_sessions(config, bf_state, paths)
bf_state["initial_backfill_done"] = True
bf_state["last_sync"] = datetime.now(timezone.utc).isoformat()
save_state(bf_state)

def _update_sessions(s):
ev = sync_sessions(config, s, paths)
s["initial_backfill_done"] = True
s["last_sync"] = datetime.now(timezone.utc).isoformat()
return ev

ev = update_state(_update_sessions)
log.info(f"Background backfill: {ev} older events synced")
except Exception as e:
log.warning(f"Background backfill error: {e}")
try:
bf_state = load_state()
lg = sync_logs(config, bf_state, paths)
save_state(bf_state)

def _update_logs(s):
return sync_logs(config, s, paths)

lg = update_state(_update_logs)
log.info(f"Background backfill: {lg} log lines synced")
except Exception as e:
log.warning(f"Background backfill log error: {e}")
Expand Down Expand Up @@ -2976,8 +3003,6 @@ def _build_gateway_data(paths: dict = None) -> dict:
time.sleep(15)




def run_daemon() -> None:
"""Run the sync daemon - main loop for continuous synchronization."""
config = load_config()
Expand Down
56 changes: 56 additions & 0 deletions tests/test_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""Tests for clawmetry.sync — state save race condition."""

import json
import threading

import pytest


@pytest.fixture(autouse=True)
def temp_openclaw_dir(tmp_path, monkeypatch):
"""Point OpenClaw dir to a temp directory so tests don't pollute real data."""
monkeypatch.setenv("CLAWMETRY_OPENCLAW_DIR", str(tmp_path))
return tmp_path


def _reload_sync():
"""Reload sync module to get fresh state after patching."""
import importlib
from clawmetry import sync

importlib.reload(sync)
return sync


class TestStateSaveRace:
"""Test that concurrent state saves don't cause data corruption."""

def test_concurrent_atomic_updates_preserve_data(self, tmp_path, monkeypatch):
"""
Using atomic update_state(), concurrent updates are serialized
and no data is lost.
"""
sync = _reload_sync()

state_file = tmp_path / "sync-state.json"
monkeypatch.setattr(sync, "STATE_FILE", state_file)

state_file.write_text(json.dumps({"counter": 0}))

def writer_thread(iterations=50):
for i in range(iterations):
sync.update_state(lambda s: s.update(counter=s.get("counter", 0) + 1))

t1 = threading.Thread(target=writer_thread, args=(50,))
t2 = threading.Thread(target=writer_thread, args=(50,))

t1.start()
t2.start()
t1.join()
t2.join()

final_state = json.loads(state_file.read_text())
assert final_state["counter"] == 100, (
f"Expected counter=100, got {final_state['counter']} - "
"atomic updates should prevent data loss"
)
Loading