From 6ae4ff2fe36fb2a4cecce23abefa7479f93f8a15 Mon Sep 17 00:00:00 2001 From: dumko2001 Date: Fri, 3 Apr 2026 11:25:27 +0530 Subject: [PATCH] fix(resource): use context manager for file handles --- clawmetry/sync.py | 427 ++++++++++++++++---------------- tests/test_sync_file_handles.py | 125 ++++++++++ 2 files changed, 344 insertions(+), 208 deletions(-) create mode 100644 tests/test_sync_file_handles.py diff --git a/clawmetry/sync.py b/clawmetry/sync.py index 7fe5470..920b3e0 100644 --- a/clawmetry/sync.py +++ b/clawmetry/sync.py @@ -955,7 +955,10 @@ def sync_sessions_recent( if os.path.isfile(index_path): try: current_mtime = os.path.getmtime(index_path) - if _sessions_json_cache["data"] is not None and _sessions_json_cache["mtime"] == current_mtime: + if ( + _sessions_json_cache["data"] is not None + and _sessions_json_cache["mtime"] == current_mtime + ): file_to_subagent_id = _sessions_json_cache["data"] else: with open(index_path) as _fi: @@ -964,8 +967,14 @@ def sync_sessions_recent( if ":subagent:" in _k and isinstance(_meta, dict): _sf = _meta.get("sessionFile", "") if _sf: - file_to_subagent_id[os.path.basename(_sf)] = _k.split(":")[-1] - _sessions_json_cache = {"ts": time.time(), "data": file_to_subagent_id.copy(), "mtime": current_mtime} + file_to_subagent_id[os.path.basename(_sf)] = _k.split(":")[ + -1 + ] + _sessions_json_cache = { + "ts": time.time(), + "data": file_to_subagent_id.copy(), + "mtime": current_mtime, + } except Exception: pass @@ -1726,125 +1735,128 @@ def _build_brain_data(): try: session_name = os.path.basename(fp).split(".")[0][:12] prev_user_ts = None # for duration calculation - for line_raw in open(fp, errors="ignore"): - try: - ev = json.loads(line_raw) + with open(fp, errors="ignore") as _fh: + for line_raw in _fh: + try: + ev = json.loads(line_raw) - # Track user message timestamps for duration calc - if ev.get("type") == "message": - msg_role = (ev.get("message") or {}).get("role", "") - if msg_role == "user": - prev_user_ts = ev.get("timestamp") + # Track user message timestamps for duration calc + if ev.get("type") == "message": + msg_role = (ev.get("message") or {}).get("role", "") + if msg_role == "user": + prev_user_ts = ev.get("timestamp") - if ev.get("type") != "message": - continue - msg = ev.get("message", {}) - role = msg.get("role", "") - if role != "assistant": - continue + if ev.get("type") != "message": + continue + msg = ev.get("message", {}) + role = msg.get("role", "") + if role != "assistant": + continue - usage = msg.get("usage") or ev.get("usage") or {} - if not usage: - continue + usage = msg.get("usage") or ev.get("usage") or {} + if not usage: + continue - ts = ev.get("timestamp", "") - if not ts or today not in ts[:10]: - continue + ts = ev.get("timestamp", "") + if not ts or today not in ts[:10]: + continue - # OpenClaw JSONL format uses: input/output/cacheRead/cacheWrite/cost.total - tok_in = ( - usage.get("input") - or usage.get("inputTokens") - or usage.get("input_tokens") - or 0 - ) - tok_out = ( - usage.get("output") - or usage.get("outputTokens") - or usage.get("output_tokens") - or 0 - ) - cr = ( - usage.get("cacheRead") - or usage.get("cacheReadInputTokens") - or usage.get("cache_read_input_tokens") - or 0 - ) - cw = ( - usage.get("cacheWrite") - or usage.get("cacheCreationInputTokens") - or usage.get("cache_creation_input_tokens") - or 0 - ) + # OpenClaw JSONL format uses: input/output/cacheRead/cacheWrite/cost.total + tok_in = ( + usage.get("input") + or usage.get("inputTokens") + or usage.get("input_tokens") + or 0 + ) + tok_out = ( + usage.get("output") + or usage.get("outputTokens") + or usage.get("output_tokens") + or 0 + ) + cr = ( + usage.get("cacheRead") + or usage.get("cacheReadInputTokens") + or usage.get("cache_read_input_tokens") + or 0 + ) + cw = ( + usage.get("cacheWrite") + or usage.get("cacheCreationInputTokens") + or usage.get("cache_creation_input_tokens") + or 0 + ) - # Use actual cost from usage.cost.total if available, else estimate - cost_obj = usage.get("cost", {}) - if isinstance(cost_obj, dict) and cost_obj.get("total"): - cost = float(cost_obj["total"]) - else: - cost = ( - tok_in * 3 + tok_out * 15 + cr * 0.3 + cw * 3.75 - ) / 1_000_000 - - # Duration: compute from prev user msg timestamp (durationMs rarely stored) - dur_ms = int( - ev.get("durationMs", 0) or ev.get("duration_ms", 0) or 0 - ) - if not dur_ms and prev_user_ts and ts: - try: - from datetime import timezone + # Use actual cost from usage.cost.total if available, else estimate + cost_obj = usage.get("cost", {}) + if isinstance(cost_obj, dict) and cost_obj.get("total"): + cost = float(cost_obj["total"]) + else: + cost = ( + tok_in * 3 + tok_out * 15 + cr * 0.3 + cw * 3.75 + ) / 1_000_000 + + # Duration: compute from prev user msg timestamp (durationMs rarely stored) + dur_ms = int( + ev.get("durationMs", 0) or ev.get("duration_ms", 0) or 0 + ) + if not dur_ms and prev_user_ts and ts: + try: + from datetime import timezone - t1 = datetime.fromisoformat( - prev_user_ts.replace("Z", "+00:00") - ) - t2 = datetime.fromisoformat(ts.replace("Z", "+00:00")) - d = int((t2 - t1).total_seconds() * 1000) - if 0 < d < 300000: - dur_ms = d - except Exception: - pass - - has_thinking = False - tools_used = [] - if isinstance(msg.get("content"), list): - for c in msg["content"]: - if c.get("type") == "thinking": - has_thinking = True - elif c.get("type") == "toolCall": - tn = c.get("name", "") - if tn and tn not in tools_used: - tools_used.append(tn) - - m = msg.get("model") or ev.get("model") or "" - if m and m != "unknown": - model_name = m.split("/")[-1] if "/" in m else m - - total_tokens_in += tok_in - total_tokens_out += tok_out - total_cache_read += cr - total_cache_write += cw - total_cost += cost - total_duration += dur_ms - if has_thinking: - thinking_calls += 1 - if cr > 0: - cache_hit_calls += 1 - - calls.append( - { - "timestamp": ts, - "session": session_name, - "tokens_in": tok_in, - "tokens_out": tok_out, - "cost": "$" + format(cost, ".4f"), - "duration_ms": dur_ms, - "thinking": has_thinking, - "cache_read": cr, - "tools_used": tools_used[:5], - } - ) - except Exception: - continue + t1 = datetime.fromisoformat( + prev_user_ts.replace("Z", "+00:00") + ) + t2 = datetime.fromisoformat( + ts.replace("Z", "+00:00") + ) + d = int((t2 - t1).total_seconds() * 1000) + if 0 < d < 300000: + dur_ms = d + except Exception: + pass + + has_thinking = False + tools_used = [] + if isinstance(msg.get("content"), list): + for c in msg["content"]: + if c.get("type") == "thinking": + has_thinking = True + elif c.get("type") == "toolCall": + tn = c.get("name", "") + if tn and tn not in tools_used: + tools_used.append(tn) + + m = msg.get("model") or ev.get("model") or "" + if m and m != "unknown": + model_name = m.split("/")[-1] if "/" in m else m + + total_tokens_in += tok_in + total_tokens_out += tok_out + total_cache_read += cr + total_cache_write += cw + total_cost += cost + total_duration += dur_ms + if has_thinking: + thinking_calls += 1 + if cr > 0: + cache_hit_calls += 1 + + calls.append( + { + "timestamp": ts, + "session": session_name, + "tokens_in": tok_in, + "tokens_out": tok_out, + "cost": "$" + format(cost, ".4f"), + "duration_ms": dur_ms, + "thinking": has_thinking, + "cache_read": cr, + "tools_used": tools_used[:5], + } + ) + except Exception: + continue except Exception: continue @@ -1923,104 +1935,105 @@ def _build_tool_stats(): for fp in files: _file_channel = _session_channels.get(os.path.basename(fp), "") try: - for line in open(fp, errors="ignore"): - try: - ev = json.loads(line) - if ev.get("type") != "message": - continue - msg = ev.get("message", {}) - ts = ev.get("timestamp", "") - role = msg.get("role", "") - - if isinstance(msg.get("content"), list): - for c in msg["content"]: - if c.get("type") == "toolCall": - name = c.get("name", "?") - tool_counts[name] += 1 - args = ( - c.get("arguments", {}) - or c.get("input", {}) - or c.get("args", {}) - or {} - ) - if isinstance(args, str): - try: - args = json.loads(args) - except: - args = {} - - # Track recent entries for specific tools - if name == "web_search": - q = args.get("query", "") - if q and name not in tool_recent: - tool_recent[name] = [] - if q: + with open(fp, errors="ignore") as _fh: + for line in _fh: + try: + ev = json.loads(line) + if ev.get("type") != "message": + continue + msg = ev.get("message", {}) + ts = ev.get("timestamp", "") + role = msg.get("role", "") + + if isinstance(msg.get("content"), list): + for c in msg["content"]: + if c.get("type") == "toolCall": + name = c.get("name", "?") + tool_counts[name] += 1 + args = ( + c.get("arguments", {}) + or c.get("input", {}) + or c.get("args", {}) + or {} + ) + if isinstance(args, str): + try: + args = json.loads(args) + except: + args = {} + + # Track recent entries for specific tools + if name == "web_search": + q = args.get("query", "") + if q and name not in tool_recent: + tool_recent[name] = [] + if q: + tool_recent.setdefault(name, []).append( + {"query": q[:200], "ts": ts} + ) + elif name == "web_fetch": + url = args.get("url", "") + if url: + tool_recent.setdefault(name, []).append( + {"url": url[:200], "ts": ts} + ) + elif name == "browser": + action = args.get("action", "") + url = args.get("url", "") tool_recent.setdefault(name, []).append( - {"query": q[:200], "ts": ts} + { + "action": action, + "url": url[:200] if url else "", + "ts": ts, + } ) - elif name == "web_fetch": - url = args.get("url", "") - if url: - tool_recent.setdefault(name, []).append( - {"url": url[:200], "ts": ts} + elif name == "exec": + cmd = args.get("command", "") + if cmd: + tool_recent.setdefault(name, []).append( + {"command": cmd[:300], "ts": ts} + ) + elif name == "message": + target = args.get("target", "") or args.get( + "channel", "" ) - elif name == "browser": - action = args.get("action", "") - url = args.get("url", "") - tool_recent.setdefault(name, []).append( - { - "action": action, - "url": url[:200] if url else "", - "ts": ts, - } - ) - elif name == "exec": - cmd = args.get("command", "") - if cmd: tool_recent.setdefault(name, []).append( - {"command": cmd[:300], "ts": ts} + {"target": target, "ts": ts} ) - elif name == "message": - target = args.get("target", "") or args.get( - "channel", "" - ) - tool_recent.setdefault(name, []).append( - {"target": target, "ts": ts} - ) - # Track channel messages (inbound + outbound) - if role in ("user", "assistant"): - text = "" - if isinstance(msg.get("content"), str): - text = msg["content"][:300] - elif isinstance(msg.get("content"), list): - for c in msg["content"]: - if c.get("type") == "text": - text = c.get("text", "")[:300] - break - - # Try to detect channel from metadata, fall back to session-level channel - meta = ev.get("metadata", {}) or {} - channel = ( - meta.get("channel", "") - or meta.get("surface", "") - or _file_channel - ) - if channel and text: - direction = "in" if role == "user" else "out" - channel_msgs[channel][direction] += 1 - channel_msgs[channel]["messages"].append( - { - "direction": direction, - "content": text[:200], - "timestamp": ts, - "sender": meta.get("sender", "User") - if role == "user" - else "Agent", - } + # Track channel messages (inbound + outbound) + if role in ("user", "assistant"): + text = "" + if isinstance(msg.get("content"), str): + text = msg["content"][:300] + elif isinstance(msg.get("content"), list): + for c in msg["content"]: + if c.get("type") == "text": + text = c.get("text", "")[:300] + break + + # Try to detect channel from metadata, fall back to session-level channel + meta = ev.get("metadata", {}) or {} + channel = ( + meta.get("channel", "") + or meta.get("surface", "") + or _file_channel ) - except Exception: - continue + if channel and text: + direction = "in" if role == "user" else "out" + channel_msgs[channel][direction] += 1 + channel_msgs[channel]["messages"].append( + { + "direction": direction, + "content": text[:200], + "timestamp": ts, + "sender": meta.get("sender", "User") + if role == "user" + else "Agent", + } + ) + except Exception: + continue except Exception: continue @@ -2976,8 +2989,6 @@ def _build_gateway_data(paths: dict = None) -> dict: time.sleep(15) - - def run_daemon() -> None: """Run the sync daemon - main loop for continuous synchronization.""" config = load_config() diff --git a/tests/test_sync_file_handles.py b/tests/test_sync_file_handles.py new file mode 100644 index 0000000..a0cf439 --- /dev/null +++ b/tests/test_sync_file_handles.py @@ -0,0 +1,125 @@ +"""Tests for file handle leaks in sync.py.""" + +import json +import os +import sys +import tempfile +import warnings +import unittest +from pathlib import Path +from unittest.mock import patch, MagicMock + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from clawmetry import sync + + +class TestFileHandleLeaks(unittest.TestCase): + """Test that file handles are properly closed using context managers.""" + + def _create_test_jsonl(self, tmpdir, filename, events): + """Create a test JSONL file with given events.""" + filepath = os.path.join(tmpdir, filename) + with open(filepath, "w") as f: + for event in events: + f.write(json.dumps(event) + "\n") + return filepath + + def test_build_brain_data_no_resource_warning(self): + """Test _build_brain_data() does not leak file handles.""" + events = [ + { + "type": "message", + "timestamp": "2024-01-01T12:00:00", + "message": {"role": "user", "content": "Hello"}, + }, + { + "type": "message", + "timestamp": "2024-01-01T12:00:01", + "message": { + "role": "assistant", + "content": "Hi there", + "usage": {"input_tokens": 10, "output_tokens": 20}, + }, + }, + ] + + with tempfile.TemporaryDirectory() as tmpdir: + session_file = self._create_test_jsonl( + tmpdir, "test_session_001.jsonl", events + ) + + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always", ResourceWarning) + + with patch.object(sync, "_get_openclaw_dir", return_value=tmpdir): + with patch.object(sync.os.path, "isdir", return_value=True): + with patch.object( + sync.glob, "glob", return_value=[session_file] + ): + result = sync._build_brain_data() + + resource_warnings = [ + x for x in w if issubclass(x.category, ResourceWarning) + ] + self.assertEqual( + len(resource_warnings), + 0, + f"ResourceWarning raised: {[str(w.message) for w in resource_warnings]}", + ) + + def test_build_tool_stats_no_resource_warning(self): + """Test _build_tool_stats() does not leak file handles.""" + events = [ + { + "type": "message", + "timestamp": "2024-01-01T12:00:00", + "message": {"role": "user", "content": "Search the web"}, + }, + { + "type": "message", + "timestamp": "2024-01-01T12:00:01", + "message": { + "role": "assistant", + "content": [ + { + "type": "toolCall", + "name": "web_search", + "arguments": {"query": "test"}, + } + ], + }, + }, + ] + + with tempfile.TemporaryDirectory() as tmpdir: + session_file = self._create_test_jsonl( + tmpdir, "test_session_002.jsonl", events + ) + sessions_json = os.path.join(tmpdir, "sessions.json") + with open(sessions_json, "w") as f: + json.dump( + {"test_session_002": {"sessionFile": "test_session_002.jsonl"}}, f + ) + + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always", ResourceWarning) + + with patch.object(sync, "_get_openclaw_dir", return_value=tmpdir): + with patch.object(sync.os.path, "isdir", return_value=True): + with patch.object( + sync.glob, "glob", return_value=[session_file] + ): + result = sync._build_tool_stats() + + resource_warnings = [ + x for x in w if issubclass(x.category, ResourceWarning) + ] + self.assertEqual( + len(resource_warnings), + 0, + f"ResourceWarning raised: {[str(w.message) for w in resource_warnings]}", + ) + + +if __name__ == "__main__": + unittest.main()