From fe9902dfadb8920256741955d8864f6239c8d625 Mon Sep 17 00:00:00 2001 From: Jack Arturo Date: Fri, 9 Jan 2026 19:20:01 +0100 Subject: [PATCH 01/25] feat(watch): Enhanced SSE monitoring with detailed event data MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit STORE events now include: - Full content (not truncated) - All tags and metadata - Type confidence, embedding/qdrant status RECALL events now include: - Full query text - Aggregate stats (avg length, avg tags, score range) - Top 3 result summaries with type/score/length/tags - Dedup count ENRICHMENT events now include: - Entity counts by category (tools, people, projects, etc.) - Relationship counts (temporal, semantic, patterns) - Entity tags added count - Summary preview Also: - Simplified automem_watch.py to tail-style streaming log - Added reconnect counter display - Refactored enrich_memory() to return detailed dict instead of bool 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- app.py | 110 +++++-- scripts/automem_watch.py | 615 +++++++++++++++++---------------------- 2 files changed, 357 insertions(+), 368 deletions(-) diff --git a/app.py b/app.py index 78b0039..3b9d93a 100644 --- a/app.py +++ b/app.py @@ -1753,21 +1753,35 @@ def enrichment_worker() -> None: ) try: - processed = enrich_memory(job.memory_id, forced=job.forced) + result = enrich_memory(job.memory_id, forced=job.forced) state.enrichment_stats.record_success(job.memory_id) elapsed_ms = int((time.perf_counter() - enrich_start) * 1000) + + # Build entity counts for display + entities = result.get("entities", {}) + entity_counts = {cat: len(vals) for cat, vals in entities.items() if vals} + emit_event( "enrichment.complete", { "memory_id": job.memory_id, - "success": True, "elapsed_ms": elapsed_ms, - "skipped": not processed, + "skipped": not result.get("processed", False), + "skip_reason": result.get("reason"), + # New detailed fields: + "entities": entity_counts, + "temporal_links": result.get("temporal_links", 0), + "patterns_detected": len(result.get("patterns_detected", [])), + "semantic_neighbors": result.get("semantic_neighbors", 0), + "summary_preview": result.get("summary", "")[:80], + "entity_tags_added": result.get("entity_tags_added", 0), }, utc_now, ) - if not processed: - logger.debug("Enrichment skipped for %s (already processed)", job.memory_id) + if not result.get("processed"): + logger.debug( + "Enrichment skipped for %s (%s)", job.memory_id, result.get("reason") + ) except Exception as exc: # pragma: no cover - background thread state.enrichment_stats.record_failure(str(exc)) elapsed_ms = int((time.perf_counter() - enrich_start) * 1000) @@ -2071,8 +2085,18 @@ def _run_sync_check() -> None: logger.exception("Sync check failed") -def enrich_memory(memory_id: str, *, forced: bool = False) -> bool: - """Enrich a memory with relationships, patterns, and entity extraction.""" +def enrich_memory(memory_id: str, *, forced: bool = False) -> Dict[str, Any]: + """Enrich a memory with relationships, patterns, and entity extraction. + + Returns a dict with enrichment details: + - processed: True if enrichment was performed + - entities: Dict of extracted entities by category + - temporal_links: Count of temporal relationships created + - patterns_detected: List of detected pattern info + - semantic_neighbors: Count of semantic neighbor links + - summary: Generated summary (truncated) + - entity_tags_added: Count of entity tags added + """ graph = get_memory_graph() if graph is None: raise RuntimeError("FalkorDB unavailable for enrichment") @@ -2081,7 +2105,7 @@ def enrich_memory(memory_id: str, *, forced: bool = False) -> bool: if not result.result_set: logger.debug("Skipping enrichment for %s; memory not found", memory_id) - return False + return {"processed": False, "reason": "memory_not_found"} node = result.result_set[0][0] properties = getattr(node, "properties", None) @@ -2095,7 +2119,7 @@ def enrich_memory(memory_id: str, *, forced: bool = False) -> bool: already_processed = bool(properties.get("processed")) if already_processed and not forced: - return False + return {"processed": False, "reason": "already_processed"} content = properties.get("content", "") or "" entities = extract_entities(content) @@ -2194,7 +2218,15 @@ def enrich_memory(memory_id: str, *, forced: bool = False) -> bool: len(semantic_neighbors), ) - return True + return { + "processed": True, + "entities": entities, + "temporal_links": temporal_links, + "patterns_detected": pattern_info, + "semantic_neighbors": len(semantic_neighbors), + "summary": (summary or "")[:100], + "entity_tags_added": len(entity_tags), + } def find_temporal_relationships(graph: Any, memory_id: str, limit: int = 5) -> int: @@ -2670,13 +2702,16 @@ def store_memory() -> Any: emit_event( "memory.store", { - "id": memory_id, - "content_preview": content[:100] + "..." if len(content) > 100 else content, + "memory_id": memory_id, + "content": content, # Full content "type": memory_type, + "type_confidence": type_confidence, "importance": importance, - "tags": tags[:5], - "size_bytes": len(content), - "elapsed_ms": int(response["query_time_ms"]), + "tags": tags, # All tags + "metadata": metadata, # Full metadata + "embedding_status": embedding_status, + "qdrant_status": qdrant_result, + "elapsed_ms": round(response["query_time_ms"], 2), }, utc_now, ) @@ -2952,23 +2987,60 @@ def recall_memories() -> Any: # Emit SSE event for real-time monitoring elapsed_ms = int((time.perf_counter() - query_start) * 1000) result_count = 0 + dedup_removed = 0 + result_summaries: List[Dict[str, Any]] = [] try: # Response is either a tuple (response, status) or Response object resp_data = response[0] if isinstance(response, tuple) else response if hasattr(resp_data, "get_json"): data = resp_data.get_json(silent=True) or {} - result_count = len(data.get("memories", [])) + results = data.get("results", []) + result_count = len(results) + dedup_removed = data.get("dedup_removed", 0) + + # Build result summaries for top 10 results + for r in results[:10]: + mem = r.get("memory", {}) + result_summaries.append( + { + "id": str(r.get("id", ""))[:8], + "score": round(float(r.get("final_score", 0)), 3), + "type": mem.get("type", "?"), + "content_length": len(mem.get("content", "")), + "tags_count": len(mem.get("tags", [])), + } + ) except Exception as e: logger.debug("Failed to parse response for result_count", exc_info=e) + # Compute aggregate stats + avg_length = 0 + avg_tags = 0 + score_min = 0 + score_max = 0 + if result_summaries: + avg_length = int(sum(r["content_length"] for r in result_summaries) / len(result_summaries)) + avg_tags = round(sum(r["tags_count"] for r in result_summaries) / len(result_summaries), 1) + scores = [r["score"] for r in result_summaries] + score_min = min(scores) + score_max = max(scores) + emit_event( "memory.recall", { - "query": query_text[:50] if query_text else "(no query)", - "limit": limit, + "query": query_text, # Full query "result_count": result_count, + "dedup_removed": dedup_removed, "elapsed_ms": elapsed_ms, - "tags": tag_filters[:3] if tag_filters else [], + "tags_filter": tag_filters if tag_filters else [], + "has_time_filter": bool(start_time or end_time), + "vector_search": bool(query_text or embedding_param), + "stats": { + "avg_length": avg_length, + "avg_tags": avg_tags, + "score_range": [score_min, score_max], + }, + "result_summaries": result_summaries[:3], # Top 3 for display }, utc_now, ) diff --git a/scripts/automem_watch.py b/scripts/automem_watch.py index 42f8fa5..c7c8e7d 100755 --- a/scripts/automem_watch.py +++ b/scripts/automem_watch.py @@ -1,394 +1,311 @@ #!/usr/bin/env python3 -"""AutoMem real-time monitor - Terminal UI for observing memory service activity. +"""AutoMem tail - Simple streaming log of memory operations. Usage: - python scripts/automem_watch.py --url https://automem.railway.app --token $AUTOMEM_API_TOKEN + python scripts/automem_watch.py --url https://automem.up.railway.app --token $AUTOMEM_API_TOKEN python scripts/automem_watch.py --url http://localhost:8001 --token dev - -Features: - - Real-time SSE event streaming - - Garbage pattern detection (short content, test data, duplicates, bursts) - - Consolidation monitoring with timing - - Auto-reconnect on connection loss """ import argparse -import hashlib import json import sys -import threading +import textwrap import time -from collections import Counter, deque -from datetime import datetime -from typing import Deque, Dict, List, Optional, Set try: import httpx from rich.console import Console - from rich.layout import Layout - from rich.live import Live - from rich.panel import Panel - from rich.table import Table - from rich.text import Text + from rich.syntax import Syntax except ImportError: print("Required dependencies missing. Install with:") print(" pip install rich httpx") sys.exit(1) +console = Console() -class GarbageDetector: - """Detect suspicious patterns in memory stores.""" - - SUSPICIOUS_KEYWORDS = { - "test", - "asdf", - "xxx", - "lorem", - "foo", - "bar", - "baz", - "debug", - "tmp", - "placeholder", - "example", - "sample", - } - - def __init__(self, min_content_length: int = 10, burst_threshold: int = 5): - self.min_content_length = min_content_length - self.burst_threshold = burst_threshold - self.recent_hashes: Deque[str] = deque(maxlen=100) - self.warnings: Deque[str] = deque(maxlen=50) - self.counts: Counter = Counter() - self.burst_times: Deque[float] = deque(maxlen=20) - - def analyze(self, event: Dict) -> Optional[str]: - """Analyze a memory.store event for garbage patterns.""" - if event.get("type") != "memory.store": - return None - - data = event.get("data", {}) - content = data.get("content_preview", "") - now = time.time() - - # Check: Burst detection (>N stores in 10 seconds) - self.burst_times.append(now) - recent_count = sum(1 for t in self.burst_times if now - t < 10) - if recent_count >= self.burst_threshold: - warning = f"Burst detected: {recent_count} stores in 10s" - self._record(warning, "burst") - # Don't return - continue checking other patterns - - # Check: Very short content - if len(content) < self.min_content_length: - warning = f"Short ({len(content)} chars): '{content[:30]}'" - self._record(warning, "short_content") - return warning - - # Check: Test/debug keywords - content_lower = content.lower() - for kw in self.SUSPICIOUS_KEYWORDS: - if kw in content_lower and len(content) < 50: - warning = f"Test keyword '{kw}': '{content[:40]}...'" - self._record(warning, "test_keyword") - return warning - - # Check: No tags - tags = data.get("tags", []) - if not tags: - warning = f"No tags: '{content[:40]}...'" - self._record(warning, "no_tags") - return warning - - # Check: Very low importance with generic content - importance = data.get("importance", 0.5) - if importance < 0.3 and data.get("type") == "Memory": - warning = f"Low importance ({importance}): '{content[:40]}...'" - self._record(warning, "low_importance") - return warning - - # Check: Duplicate content (hash-based) - content_hash = hashlib.md5(content.encode()).hexdigest()[:8] - if content_hash in self.recent_hashes: - warning = f"Duplicate hash: '{content[:40]}...'" - self._record(warning, "duplicate") - return warning - self.recent_hashes.append(content_hash) - - return None - - def _record(self, warning: str, category: str) -> None: - ts = datetime.now().strftime("%H:%M:%S") - self.warnings.appendleft(f"[{ts}] {warning}") - self.counts[category] += 1 - - -class AutoMemMonitor: - """Real-time monitor for AutoMem service.""" - - def __init__(self, url: str, token: str, min_content_length: int = 10): - self.url = url.rstrip("/") - self.token = token - self.events: Deque[Dict] = deque(maxlen=100) - self.errors: Deque[str] = deque(maxlen=20) - self.stats = { - "stores": 0, - "recalls": 0, - "enriched": 0, - "consolidated": 0, - "errors": 0, - } - self.garbage = GarbageDetector(min_content_length) - self.start_time = datetime.now() - self.connected = False - self.last_event_time: Optional[datetime] = None - self.consolidation_history: Deque[Dict] = deque(maxlen=10) - - def connect(self): - """Connect to SSE stream and process events.""" - headers = {"Authorization": f"Bearer {self.token}"} - - with httpx.Client(timeout=None) as client: - with client.stream("GET", f"{self.url}/stream", headers=headers) as response: - if response.status_code != 200: - raise Exception(f"HTTP {response.status_code}: {response.text[:100]}") - self.connected = True - for line in response.iter_lines(): - if line.startswith("data: "): - try: - event = json.loads(line[6:]) - self._process_event(event) - except json.JSONDecodeError: - pass - elif line.startswith(":"): - # Keepalive comment - ignore - pass - - def _process_event(self, event: Dict) -> None: - """Process an incoming event.""" - self.last_event_time = datetime.now() - self.events.appendleft(event) - - event_type = event.get("type", "") - - # Update stats - if event_type == "memory.store": - self.stats["stores"] += 1 - # Check for garbage - warning = self.garbage.analyze(event) - if warning: - ts = datetime.now().strftime("%H:%M:%S") - self.errors.appendleft(f"[{ts}] [GARBAGE] {warning}") - elif event_type == "memory.recall": - self.stats["recalls"] += 1 - elif event_type == "enrichment.complete": - self.stats["enriched"] += 1 - elif event_type == "enrichment.failed": - self.stats["errors"] += 1 - ts = datetime.now().strftime("%H:%M:%S") - err = event.get("data", {}).get("error", "unknown")[:50] - self.errors.appendleft(f"[{ts}] [ENRICH] {err}") - elif event_type == "consolidation.run": - self.stats["consolidated"] += 1 - self.consolidation_history.appendleft(event.get("data", {})) - elif event_type == "error": - self.stats["errors"] += 1 - ts = datetime.now().strftime("%H:%M:%S") - err = event.get("data", {}).get("error", "unknown")[:50] - self.errors.appendleft(f"[{ts}] [ERROR] {err}") - - def render(self) -> Layout: - """Render the TUI layout.""" - layout = Layout() - - layout.split_column( - Layout(name="header", size=3), - Layout(name="main", ratio=2), - Layout(name="bottom", size=14), - ) - layout["main"].split_row( - Layout(name="events", ratio=2), - Layout(name="consolidation", ratio=1), - ) +def format_timestamp(ts: str) -> str: + """Extract just the time from ISO timestamp.""" + if "T" in ts: + return ts.split("T")[1][:12] + return ts[:12] - layout["bottom"].split_row( - Layout(name="stats", ratio=1), - Layout(name="garbage", ratio=1), - Layout(name="errors", ratio=1), - ) - # Header - uptime = datetime.now() - self.start_time - uptime_str = ( - f"{int(uptime.total_seconds() // 3600)}h {int((uptime.total_seconds() % 3600) // 60)}m" +def print_store_event(event: dict) -> None: + """Print a memory.store event with full content.""" + data = event.get("data", {}) + ts = format_timestamp(event.get("timestamp", "")) + + console.print() + console.print(f"[bold green]━━━ STORE[/] [dim]{ts}[/]") + + # Memory info line + mem_id = data.get("memory_id", "?")[:8] + mem_type = data.get("type", "Memory") + type_conf = data.get("type_confidence", 0) + importance = data.get("importance", 0.5) + elapsed = data.get("elapsed_ms", 0) + + console.print( + f" [cyan]id:[/] {mem_id} " + f"[cyan]type:[/] {mem_type} ({type_conf:.2f}) " + f"[cyan]importance:[/] {importance} " + f"[dim]{elapsed}ms[/]" + ) + + # Tags + tags = data.get("tags", []) + if tags: + console.print(f" [cyan]tags:[/] {', '.join(tags)}") + + # Metadata + metadata = data.get("metadata", {}) + if metadata: + meta_preview = ", ".join(f"{k}={v}" for k, v in list(metadata.items())[:5]) + if len(metadata) > 5: + meta_preview += f", ... (+{len(metadata) - 5} more)" + console.print(f" [cyan]metadata:[/] {{{meta_preview}}}") + + # Embedding status + emb_status = data.get("embedding_status", "") + qdrant_status = data.get("qdrant_status", "") + if emb_status or qdrant_status: + console.print(f" [cyan]embedding:[/] {emb_status} [cyan]qdrant:[/] {qdrant_status}") + + # Full content + content = data.get("content", "") + if content: + console.print(" [cyan]content:[/]") + # Wrap long content + wrapped = textwrap.fill(content, width=100, initial_indent=" ", subsequent_indent=" ") + console.print(f"[white]{wrapped}[/]") + + +def print_recall_event(event: dict) -> None: + """Print a memory.recall event with query details and result summaries.""" + data = event.get("data", {}) + ts = format_timestamp(event.get("timestamp", "")) + + console.print() + console.print(f"[bold cyan]━━━ RECALL[/] [dim]{ts}[/]") + + # Query info + query = data.get("query", "") + result_count = data.get("result_count", 0) + dedup = data.get("dedup_removed", 0) + elapsed = data.get("elapsed_ms", 0) + + dedup_str = f" (dedup: {dedup})" if dedup else "" + console.print(f' [yellow]query:[/] "{query}"') + console.print(f" [yellow]results:[/] {result_count}{dedup_str} [dim]{elapsed}ms[/]") + + # Filters + filters = [] + if data.get("has_time_filter"): + filters.append("time") + if data.get("tags_filter"): + filters.append(f"tags({len(data['tags_filter'])})") + if data.get("vector_search"): + filters.append("vector") + if filters: + console.print(f" [yellow]filters:[/] {', '.join(filters)}") + + # Stats + stats = data.get("stats", {}) + if stats: + avg_len = stats.get("avg_length", 0) + avg_tags = stats.get("avg_tags", 0) + score_range = stats.get("score_range", [0, 0]) + console.print( + f" [yellow]stats:[/] avg_len={avg_len} avg_tags={avg_tags} " + f"score_range={score_range[0]:.2f}-{score_range[1]:.2f}" ) - status = "[green]Connected[/]" if self.connected else "[red]Disconnected[/]" - last_event = "" - if self.last_event_time: - ago = (datetime.now() - self.last_event_time).total_seconds() - if ago < 60: - last_event = f" | Last event: {int(ago)}s ago" - else: - last_event = f" | Last event: {int(ago // 60)}m ago" - - layout["header"].update( - Panel( - f"[bold]AutoMem Watch[/] | {self.url} | {status} | Uptime: {uptime_str}{last_event}", - style="bold white on blue", + + # Top results + summaries = data.get("result_summaries", []) + if summaries: + console.print(" [yellow]top results:[/]") + for i, r in enumerate(summaries[:3], 1): + console.print( + f" #{i} [{r.get('type', '?'):8s}] " + f"score={r.get('score', 0):.2f} " + f"len={r.get('content_length', 0)} " + f"tags={r.get('tags_count', 0)}" ) - ) - # Events table - events_table = Table(show_header=True, header_style="bold", expand=True, show_lines=False) - events_table.add_column("Time", width=8) - events_table.add_column("Type", width=18) - events_table.add_column("Details", overflow="ellipsis") - - for event in list(self.events)[:15]: - ts = event.get("timestamp", "") - if "T" in ts: - ts = ts.split("T")[1][:8] - event_type = event.get("type", "unknown") - data = event.get("data", {}) - - # Format based on event type - if event_type == "memory.store": - preview = data.get("content_preview", "")[:40] - details = f"{preview}... ({data.get('type', '?')})" - type_style = "green" - elif event_type == "memory.recall": - query = data.get("query", "")[:30] - details = ( - f"'{query}' -> {data.get('result_count', 0)} ({data.get('elapsed_ms', 0)}ms)" - ) - type_style = "cyan" - elif event_type == "enrichment.start": - details = f"{data.get('memory_id', '')[:8]}... attempt {data.get('attempt', 1)}" - type_style = "yellow" - elif event_type == "enrichment.complete": - status = "skipped" if data.get("skipped") else "done" - details = ( - f"{data.get('memory_id', '')[:8]}... {status} ({data.get('elapsed_ms', 0)}ms)" - ) - type_style = "green" - elif event_type == "enrichment.failed": - details = f"{data.get('memory_id', '')[:8]}... {data.get('error', '')[:30]}" - type_style = "red" - elif event_type == "consolidation.run": - details = f"{data.get('task_type', '?')} - {data.get('affected_count', 0)} affected ({data.get('elapsed_ms', 0)}ms)" - type_style = "magenta" - else: - details = str(data)[:50] - type_style = "white" - - events_table.add_row(ts, f"[{type_style}]{event_type}[/]", details) - - layout["events"].update(Panel(events_table, title="Events (latest 15)")) - - # Consolidation panel - consol_text = Text() - if self.consolidation_history: - for run in list(self.consolidation_history)[:5]: - task = run.get("task_type", "?") - affected = run.get("affected_count", 0) - elapsed = run.get("elapsed_ms", 0) - success = "[green]OK[/]" if run.get("success") else "[red]FAIL[/]" - next_run = run.get("next_scheduled", "?") - consol_text.append(f"{task}: ", style="bold") - consol_text.append(f"{affected} affected, {elapsed}ms ") - consol_text.append_markup(success) - consol_text.append(f"\n Next: {next_run}\n", style="dim") - else: - consol_text.append("No consolidation runs yet", style="dim") - layout["consolidation"].update(Panel(consol_text, title="Consolidation")) - - # Stats panel - stats_text = Text() - stats_text.append(f"Stores: {self.stats['stores']}\n", style="green") - stats_text.append(f"Recalls: {self.stats['recalls']}\n", style="cyan") - stats_text.append(f"Enriched: {self.stats['enriched']}\n", style="yellow") - stats_text.append(f"Consolidated: {self.stats['consolidated']}\n", style="magenta") - stats_text.append(f"Errors: {self.stats['errors']}\n", style="red") - layout["stats"].update(Panel(stats_text, title="Stats")) - - # Garbage detector panel - garbage_text = Text() - if self.garbage.counts: - for category, count in self.garbage.counts.most_common(5): - garbage_text.append(f"{category}: {count}\n", style="yellow") - garbage_text.append("\n") - for warning in list(self.garbage.warnings)[:3]: - garbage_text.append(f"{warning[:45]}...\n", style="dim yellow") - else: - garbage_text.append("No suspicious patterns", style="green") - layout["garbage"].update(Panel(garbage_text, title="Garbage Detector")) - - # Errors panel - errors_text = Text() - if self.errors: - for err in list(self.errors)[:5]: - errors_text.append(f"{err[:50]}\n", style="red") + +def print_enrichment_event(event: dict) -> None: + """Print enrichment events with details.""" + data = event.get("data", {}) + ts = format_timestamp(event.get("timestamp", "")) + event_type = event.get("type", "") + + mem_id = data.get("memory_id", "?")[:8] + + if event_type == "enrichment.start": + attempt = data.get("attempt", 1) + console.print(f"[dim]{ts}[/] [yellow]ENRICH[/] {mem_id} attempt {attempt}") + + elif event_type == "enrichment.complete": + elapsed = data.get("elapsed_ms", 0) + + if data.get("skipped"): + reason = data.get("skip_reason", "") + console.print(f"[dim]{ts}[/] [yellow]ENRICH[/] {mem_id} skipped ({reason})") else: - errors_text.append("No errors", style="green") - layout["errors"].update(Panel(errors_text, title="Errors")) + console.print(f"[dim]{ts}[/] [green]ENRICH[/] {mem_id} done ({elapsed}ms)") + + # Entity counts + entities = data.get("entities", {}) + if entities: + entity_parts = [f"{k}={v}" for k, v in entities.items()] + console.print(f" [dim]entities:[/] {' '.join(entity_parts)}") + + # Links + temporal = data.get("temporal_links", 0) + semantic = data.get("semantic_neighbors", 0) + patterns = data.get("patterns_detected", 0) + entity_tags = data.get("entity_tags_added", 0) + + if temporal or semantic or patterns: + console.print( + f" [dim]links:[/] temporal={temporal} semantic={semantic} patterns={patterns}" + ) + + if entity_tags: + console.print(f" [dim]entity_tags_added:[/] {entity_tags}") + + # Summary preview + summary = data.get("summary_preview", "") + if summary: + console.print(f' [dim]summary:[/] "{summary}"') + + elif event_type == "enrichment.failed": + error = data.get("error", "unknown")[:80] + attempt = data.get("attempt", 1) + will_retry = data.get("will_retry", False) + retry_str = " (will retry)" if will_retry else "" + console.print( + f"[dim]{ts}[/] [red]ENRICH FAIL[/] {mem_id} attempt {attempt}: {error}{retry_str}" + ) + + +def print_consolidation_event(event: dict) -> None: + """Print consolidation events.""" + data = event.get("data", {}) + ts = format_timestamp(event.get("timestamp", "")) + + task_type = data.get("task_type", "?") + affected = data.get("affected_count", 0) + elapsed = data.get("elapsed_ms", 0) + success = data.get("success", True) + + status = "[green]OK[/]" if success else "[red]FAIL[/]" + console.print( + f"[dim]{ts}[/] [magenta]CONSOLIDATE[/] " + f"{task_type}: {affected} affected ({elapsed}ms) {status}" + ) + + +def print_error_event(event: dict) -> None: + """Print error events.""" + data = event.get("data", {}) + ts = format_timestamp(event.get("timestamp", "")) + + error = data.get("error", "unknown") + console.print(f"[dim]{ts}[/] [bold red]ERROR[/] {error}") + + +def print_raw_event(event: dict) -> None: + """Print any other event as JSON.""" + ts = format_timestamp(event.get("timestamp", "")) + event_type = event.get("type", "unknown") + + console.print(f"[dim]{ts}[/] [blue]{event_type}[/]") + # Print data as formatted JSON + data = event.get("data", {}) + if data: + json_str = json.dumps(data, indent=2, default=str) + syntax = Syntax(json_str, "json", theme="monokai", line_numbers=False) + console.print(syntax) + + +def process_event(event: dict) -> None: + """Route event to appropriate printer.""" + event_type = event.get("type", "") + + if event_type == "memory.store": + print_store_event(event) + elif event_type == "memory.recall": + print_recall_event(event) + elif event_type.startswith("enrichment."): + print_enrichment_event(event) + elif event_type == "consolidation.run": + print_consolidation_event(event) + elif event_type == "error": + print_error_event(event) + else: + print_raw_event(event) + + +def stream_events(url: str, token: str) -> None: + """Connect to SSE stream and print events.""" + headers = {"Authorization": f"Bearer {token}"} + reconnect_count = 0 + + console.print(f"[bold]Connecting to {url}/stream...[/]") + console.print("[dim]Press Ctrl+C to stop[/]") + console.print() + + while True: + try: + with httpx.Client(timeout=None) as client: + with client.stream("GET", f"{url}/stream", headers=headers) as resp: + if resp.status_code != 200: + console.print(f"[red]HTTP {resp.status_code}[/]") + break + + if reconnect_count > 0: + console.print(f"[green]Connected[/] [dim](reconnect #{reconnect_count})[/]") + else: + console.print("[green]Connected[/]") + + for line in resp.iter_lines(): + if line.startswith("data: "): + try: + event = json.loads(line[6:]) + process_event(event) + except json.JSONDecodeError: + pass + elif line.startswith(":"): + # Keepalive - ignore silently + pass - return layout + except KeyboardInterrupt: + console.print("\n[bold]Disconnected.[/]") + break + except Exception as e: + reconnect_count += 1 + console.print(f"[red]Connection error:[/] {e}") + console.print(f"[dim]Reconnecting in 5s... (attempt #{reconnect_count})[/]") + time.sleep(5) def main(): parser = argparse.ArgumentParser( - description="AutoMem real-time monitor", + description="AutoMem tail - stream memory operations", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: - # Local development python scripts/automem_watch.py --url http://localhost:8001 --token dev - - # Against Railway - python scripts/automem_watch.py --url https://automem.railway.app --token $AUTOMEM_API_TOKEN + python scripts/automem_watch.py --url https://automem.up.railway.app --token $TOKEN """, ) parser.add_argument("--url", required=True, help="AutoMem API URL") parser.add_argument("--token", required=True, help="API token") - parser.add_argument( - "--min-content-length", - type=int, - default=10, - help="Minimum content length before flagging as garbage (default: 10)", - ) args = parser.parse_args() - console = Console() - monitor = AutoMemMonitor(args.url, args.token, args.min_content_length) - - console.print(f"[bold]Connecting to {args.url}/stream...[/]") - - def connect_loop(): - while True: - try: - monitor.connect() - except KeyboardInterrupt: - break - except Exception as e: - monitor.connected = False - ts = datetime.now().strftime("%H:%M:%S") - monitor.errors.appendleft(f"[{ts}] [CONN] {str(e)[:50]}") - time.sleep(5) # Reconnect delay - - thread = threading.Thread(target=connect_loop, daemon=True) - thread.start() - - # Give connection time to establish - time.sleep(1) - - with Live(monitor.render(), console=console, refresh_per_second=2) as live: - try: - while True: - live.update(monitor.render()) - time.sleep(0.5) - except KeyboardInterrupt: - console.print("\n[bold]Disconnected.[/]") + stream_events(args.url.rstrip("/"), args.token) if __name__ == "__main__": From c30abcd87276b8174a8851908e2e018746f137fc Mon Sep 17 00:00:00 2001 From: Jack Arturo Date: Fri, 9 Jan 2026 19:48:41 +0100 Subject: [PATCH 02/25] feat(stream): Add memory.associate event logging Emit SSE events when memories are associated, showing: - Source and target memory IDs - Relation type (RELATES_TO, LEADS_TO, etc.) - Association strength Co-Authored-By: Claude Opus 4.5 --- app.py | 11 +++++++++++ scripts/automem_watch.py | 19 +++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/app.py b/app.py index 3b9d93a..e555836 100644 --- a/app.py +++ b/app.py @@ -3123,6 +3123,17 @@ def create_association() -> Any: if prop in relationship_props: response[prop] = relationship_props[prop] + emit_event( + "memory.associate", + { + "memory1_id": memory1_id, + "memory2_id": memory2_id, + "relation_type": relation_type, + "strength": strength, + }, + utc_now, + ) + return jsonify(response), 201 diff --git a/scripts/automem_watch.py b/scripts/automem_watch.py index c7c8e7d..1f8ad4f 100755 --- a/scripts/automem_watch.py +++ b/scripts/automem_watch.py @@ -216,6 +216,23 @@ def print_error_event(event: dict) -> None: console.print(f"[dim]{ts}[/] [bold red]ERROR[/] {error}") +def print_associate_event(event: dict) -> None: + """Print a memory.associate event.""" + data = event.get("data", {}) + ts = format_timestamp(event.get("timestamp", "")) + + mem1 = data.get("memory1_id", "?")[:8] + mem2 = data.get("memory2_id", "?")[:8] + rel_type = data.get("relation_type", "?") + strength = data.get("strength", 0.5) + + console.print( + f"[dim]{ts}[/] [bold blue]ASSOCIATE[/] " + f"{mem1} [cyan]--{rel_type}-->[/] {mem2} " + f"[dim]strength={strength}[/]" + ) + + def print_raw_event(event: dict) -> None: """Print any other event as JSON.""" ts = format_timestamp(event.get("timestamp", "")) @@ -238,6 +255,8 @@ def process_event(event: dict) -> None: print_store_event(event) elif event_type == "memory.recall": print_recall_event(event) + elif event_type == "memory.associate": + print_associate_event(event) elif event_type.startswith("enrichment."): print_enrichment_event(event) elif event_type == "consolidation.run": From e58288828b205db58d85d2eaa5735df5de41acf6 Mon Sep 17 00:00:00 2001 From: Jack Arturo Date: Fri, 9 Jan 2026 19:59:40 +0100 Subject: [PATCH 03/25] fix(test): Update enrichment test for dict return type enrich_memory() now returns a dict instead of bool, so check result.get("processed") instead of direct bool comparison. Co-Authored-By: Claude Opus 4.5 --- tests/test_enrichment.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_enrichment.py b/tests/test_enrichment.py index 8458deb..e8e88c5 100644 --- a/tests/test_enrichment.py +++ b/tests/test_enrichment.py @@ -108,8 +108,8 @@ def test_enrich_memory_updates_metadata(monkeypatch): fake_graph = FakeGraph() app.state.memory_graph = fake_graph - processed = app.enrich_memory("mem-1", forced=True) - assert processed is True + result = app.enrich_memory("mem-1", forced=True) + assert result.get("processed") is True assert fake_graph.temporal_calls, "Should create temporal relationships" assert fake_graph.pattern_calls, "Should update pattern nodes" From 03a3eac9e1fd4245d91dfa6a28d53dc821756b58 Mon Sep 17 00:00:00 2001 From: Jack Arturo Date: Fri, 9 Jan 2026 21:11:32 +0100 Subject: [PATCH 04/25] feat(stream): Enhanced enrichment events with full context Enrichment events now include: - Full memory content - Tags before/after enrichment (with delta) - Entities by category (tools, people, projects, etc.) - Temporal link IDs (not just count) - Semantic neighbor IDs with similarity scores - Pattern detection details - Generated summary Changes: - enrich_memory() now returns detailed dict with before/after state - find_temporal_relationships() returns List[str] instead of count - automem_watch.py displays enhanced enrichment output Co-Authored-By: Claude Opus 4.5 --- app.py | 69 ++++++++++++++++++++-------------- scripts/automem_watch.py | 81 ++++++++++++++++++++++++++++------------ tests/test_enrichment.py | 2 +- 3 files changed, 99 insertions(+), 53 deletions(-) diff --git a/app.py b/app.py index e555836..1be67f8 100644 --- a/app.py +++ b/app.py @@ -1757,10 +1757,6 @@ def enrichment_worker() -> None: state.enrichment_stats.record_success(job.memory_id) elapsed_ms = int((time.perf_counter() - enrich_start) * 1000) - # Build entity counts for display - entities = result.get("entities", {}) - entity_counts = {cat: len(vals) for cat, vals in entities.items() if vals} - emit_event( "enrichment.complete", { @@ -1768,13 +1764,16 @@ def enrichment_worker() -> None: "elapsed_ms": elapsed_ms, "skipped": not result.get("processed", False), "skip_reason": result.get("reason"), - # New detailed fields: - "entities": entity_counts, - "temporal_links": result.get("temporal_links", 0), - "patterns_detected": len(result.get("patterns_detected", [])), - "semantic_neighbors": result.get("semantic_neighbors", 0), - "summary_preview": result.get("summary", "")[:80], - "entity_tags_added": result.get("entity_tags_added", 0), + # Full enrichment data: + "content": result.get("content", ""), + "tags_before": result.get("tags_before", []), + "tags_after": result.get("tags_after", []), + "tags_added": result.get("tags_added", []), + "entities": result.get("entities", {}), + "temporal_links": result.get("temporal_links", []), + "semantic_neighbors": result.get("semantic_neighbors", []), + "patterns_detected": result.get("patterns_detected", []), + "summary": result.get("summary", ""), }, utc_now, ) @@ -2090,12 +2089,15 @@ def enrich_memory(memory_id: str, *, forced: bool = False) -> Dict[str, Any]: Returns a dict with enrichment details: - processed: True if enrichment was performed + - content: Original memory content + - tags_before: Tags before enrichment + - tags_after: Tags after enrichment (includes entity tags) + - tags_added: Delta of tags added during enrichment - entities: Dict of extracted entities by category - - temporal_links: Count of temporal relationships created + - temporal_links: List of linked memory IDs (PRECEDED_BY relationships) + - semantic_neighbors: List of (id, score) tuples for similar memories - patterns_detected: List of detected pattern info - - semantic_neighbors: Count of semantic neighbor links - - summary: Generated summary (truncated) - - entity_tags_added: Count of entity tags added + - summary: Generated summary """ graph = get_memory_graph() if graph is None: @@ -2124,7 +2126,9 @@ def enrich_memory(memory_id: str, *, forced: bool = False) -> Dict[str, Any]: content = properties.get("content", "") or "" entities = extract_entities(content) - tags = list(dict.fromkeys(_normalize_tag_list(properties.get("tags")))) + # Capture original tags before any modification + original_tags = list(dict.fromkeys(_normalize_tag_list(properties.get("tags")))) + tags = list(original_tags) # Copy for modification entity_tags: Set[str] = set() if entities: @@ -2146,7 +2150,7 @@ def enrich_memory(memory_id: str, *, forced: bool = False) -> Dict[str, Any]: tag_prefixes = _compute_tag_prefixes(tags) - temporal_links = find_temporal_relationships(graph, memory_id) + temporal_link_ids = find_temporal_relationships(graph, memory_id) pattern_info = detect_patterns(graph, memory_id, content) semantic_neighbors = link_semantic_neighbors(graph, memory_id) @@ -2163,7 +2167,7 @@ def enrich_memory(memory_id: str, *, forced: bool = False) -> Dict[str, Any]: { "last_run": utc_now(), "forced": forced, - "temporal_links": temporal_links, + "temporal_links": temporal_link_ids, "patterns_detected": pattern_info, "semantic_neighbors": [ {"id": neighbour_id, "score": score} for neighbour_id, score in semantic_neighbors @@ -2213,25 +2217,34 @@ def enrich_memory(memory_id: str, *, forced: bool = False) -> Dict[str, Any]: logger.debug( "Enriched memory %s (temporal=%s, patterns=%s, semantic=%s)", memory_id, - temporal_links, + temporal_link_ids, pattern_info, len(semantic_neighbors), ) + # Compute tags added during enrichment + tags_added = sorted(set(tags) - set(original_tags)) + return { "processed": True, + "content": content, + "tags_before": original_tags, + "tags_after": tags, + "tags_added": tags_added, "entities": entities, - "temporal_links": temporal_links, + "temporal_links": temporal_link_ids, + "semantic_neighbors": [(nid[:8], round(score, 3)) for nid, score in semantic_neighbors], "patterns_detected": pattern_info, - "semantic_neighbors": len(semantic_neighbors), - "summary": (summary or "")[:100], - "entity_tags_added": len(entity_tags), + "summary": (summary or ""), } -def find_temporal_relationships(graph: Any, memory_id: str, limit: int = 5) -> int: - """Find and create temporal relationships with recent memories.""" - created = 0 +def find_temporal_relationships(graph: Any, memory_id: str, limit: int = 5) -> List[str]: + """Find and create temporal relationships with recent memories. + + Returns list of linked memory IDs. + """ + linked_ids: List[str] = [] try: result = graph.query( """ @@ -2262,11 +2275,11 @@ def find_temporal_relationships(graph: Any, memory_id: str, limit: int = 5) -> i """, {"id1": memory_id, "id2": related_id, "timestamp": timestamp}, ) - created += 1 + linked_ids.append(related_id) except Exception: logger.exception("Failed to find temporal relationships") - return created + return linked_ids def detect_patterns(graph: Any, memory_id: str, content: str) -> List[Dict[str, Any]]: diff --git a/scripts/automem_watch.py b/scripts/automem_watch.py index 1f8ad4f..af85af5 100755 --- a/scripts/automem_watch.py +++ b/scripts/automem_watch.py @@ -153,32 +153,65 @@ def print_enrichment_event(event: dict) -> None: reason = data.get("skip_reason", "") console.print(f"[dim]{ts}[/] [yellow]ENRICH[/] {mem_id} skipped ({reason})") else: - console.print(f"[dim]{ts}[/] [green]ENRICH[/] {mem_id} done ({elapsed}ms)") - - # Entity counts + console.print(f"\n[bold cyan]━━━ ENRICH[/] [dim]{ts}[/] [cyan]({elapsed}ms)[/]") + console.print(f" [dim]memory:[/] {mem_id}") + + # Content + content = data.get("content", "") + if content: + # Indent content lines + content_lines = content[:500].split("\n") + console.print(" [dim]content:[/]") + for line in content_lines[:8]: + console.print(f" {line}") + if len(content) > 500 or len(content_lines) > 8: + console.print(" [dim]...[/]") + + # Tags before/after + tags_before = data.get("tags_before", []) + tags_added = data.get("tags_added", []) + if tags_before or tags_added: + console.print("") + console.print(f" [dim]tags before:[/] {tags_before}") + if tags_added: + console.print(f" [green]tags added:[/] {tags_added}") + + # Entities by category entities = data.get("entities", {}) - if entities: - entity_parts = [f"{k}={v}" for k, v in entities.items()] - console.print(f" [dim]entities:[/] {' '.join(entity_parts)}") - - # Links - temporal = data.get("temporal_links", 0) - semantic = data.get("semantic_neighbors", 0) - patterns = data.get("patterns_detected", 0) - entity_tags = data.get("entity_tags_added", 0) - - if temporal or semantic or patterns: - console.print( - f" [dim]links:[/] temporal={temporal} semantic={semantic} patterns={patterns}" - ) - - if entity_tags: - console.print(f" [dim]entity_tags_added:[/] {entity_tags}") - - # Summary preview - summary = data.get("summary_preview", "") + if entities and any(entities.values()): + console.print("") + console.print(" [dim]entities:[/]") + for category, values in entities.items(): + if values: + console.print(f" {category}: {', '.join(values)}") + + # Links created + temporal_links = data.get("temporal_links", []) + semantic_neighbors = data.get("semantic_neighbors", []) + patterns = data.get("patterns_detected", []) + + if temporal_links or semantic_neighbors or patterns: + console.print("") + console.print(" [dim]links created:[/]") + if temporal_links: + ids = [tid[:8] for tid in temporal_links] + console.print(f" temporal: {', '.join(ids)} ({len(ids)} memories)") + if semantic_neighbors: + neighbor_strs = [f"{nid} ({score})" for nid, score in semantic_neighbors] + console.print(f" semantic: {', '.join(neighbor_strs)}") + if patterns: + for p in patterns: + ptype = p.get("type", "?") + similar = p.get("similar_memories", 0) + console.print(f" patterns: {ptype} ({similar} similar memories)") + + # Summary + summary = data.get("summary", "") if summary: - console.print(f' [dim]summary:[/] "{summary}"') + console.print("") + console.print(f' [dim]summary:[/] "{summary[:100]}"') + + console.print("") # Blank line after elif event_type == "enrichment.failed": error = data.get("error", "unknown")[:80] diff --git a/tests/test_enrichment.py b/tests/test_enrichment.py index e8e88c5..5286fb9 100644 --- a/tests/test_enrichment.py +++ b/tests/test_enrichment.py @@ -119,7 +119,7 @@ def test_enrich_memory_updates_metadata(monkeypatch): update_payload = fake_graph.update_calls[-1] metadata = json.loads(update_payload["metadata"]) assert metadata["entities"]["projects"] == ["Launchpad"] - assert metadata["enrichment"]["temporal_links"] == 1 + assert len(metadata["enrichment"]["temporal_links"]) == 1 # Now returns list of IDs assert metadata["enrichment"]["patterns_detected"] assert update_payload["summary"].startswith("Met with Alice") assert "entity:projects:launchpad" in update_payload["tags"] From b16112cc81b7f6984cca4f026526afcaf90118bd Mon Sep 17 00:00:00 2001 From: Jack Arturo Date: Fri, 9 Jan 2026 21:30:26 +0100 Subject: [PATCH 05/25] fix: Production bugs - datetime tz, OpenAI tokens, Qdrant race Three critical bugs found in Railway production logs: 1. **DateTime TypeError (500s)** - `_parse_iso_datetime()` now assumes UTC for naive timestamps - Fixes: "can't compare offset-naive and offset-aware datetimes" 2. **OpenAI max_tokens 400 error** - Use `max_completion_tokens` for o-series/gpt-5 models - Use `max_tokens` for older models (gpt-4o-mini, etc.) 3. **Qdrant 404 race condition** - Handle UnexpectedResponse 404 gracefully (point not yet uploaded) - Log as debug instead of error for expected race condition Added test coverage for naive datetime handling. Co-Authored-By: Claude Opus 4.5 --- app.py | 21 ++++++++++++++++- automem/utils/text.py | 11 +++++++-- automem/utils/time.py | 8 +++++-- tests/test_app.py | 52 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 87 insertions(+), 5 deletions(-) diff --git a/app.py b/app.py index 1be67f8..8dfd4bd 100644 --- a/app.py +++ b/app.py @@ -31,6 +31,11 @@ from qdrant_client import QdrantClient from qdrant_client import models as qdrant_models +try: + from qdrant_client.http.exceptions import UnexpectedResponse +except ImportError: # Allow tests to import without full qdrant client installed + UnexpectedResponse = Exception # type: ignore[misc,assignment] + try: # Allow tests to import without full qdrant client installed from qdrant_client.models import Distance, PayloadSchemaType, PointStruct, VectorParams except Exception: # pragma: no cover - degraded import path @@ -780,6 +785,12 @@ def _classify_with_llm(self, content: str) -> Optional[tuple[str, float]]: return None try: + # Use max_completion_tokens for o-series/newer models, max_tokens for others + token_param = ( + {"max_completion_tokens": 50} + if CLASSIFICATION_MODEL.startswith(("o1", "o3", "gpt-5")) + else {"max_tokens": 50} + ) response = state.openai_client.chat.completions.create( model=CLASSIFICATION_MODEL, messages=[ @@ -788,7 +799,7 @@ def _classify_with_llm(self, content: str) -> Optional[tuple[str, float]]: ], response_format={"type": "json_object"}, temperature=0.3, - max_tokens=50, + **token_param, ) result = json.loads(response.choices[0].message.content) @@ -2211,6 +2222,14 @@ def enrich_memory(memory_id: str, *, forced: bool = False) -> Dict[str, Any]: "metadata": metadata, }, ) + except UnexpectedResponse as exc: + # 404 means embedding upload hasn't completed yet (race condition) + if exc.status_code == 404: + logger.debug( + "Qdrant payload sync skipped - point not yet uploaded: %s", memory_id[:8] + ) + else: + logger.warning("Qdrant payload sync failed (%d): %s", exc.status_code, memory_id) except Exception: logger.exception("Failed to sync Qdrant payload for enriched memory %s", memory_id) diff --git a/automem/utils/text.py b/automem/utils/text.py index 5899342..3ca45d2 100644 --- a/automem/utils/text.py +++ b/automem/utils/text.py @@ -145,7 +145,14 @@ def summarize_content( system_prompt = SUMMARIZE_SYSTEM_PROMPT.format(target_length=target_length) # Estimate tokens from target character length (~4 chars/token), cap at 150 - max_tokens = min(150, max(1, int(target_length / 4))) + token_limit = min(150, max(1, int(target_length / 4))) + + # Use max_completion_tokens for o-series/newer models, max_tokens for others + token_param = ( + {"max_completion_tokens": token_limit} + if model.startswith(("o1", "o3", "gpt-5")) + else {"max_tokens": token_limit} + ) response = openai_client.chat.completions.create( model=model, @@ -154,7 +161,7 @@ def summarize_content( {"role": "user", "content": content}, ], temperature=0.3, - max_tokens=max_tokens, + **token_param, ) summary = response.choices[0].message.content.strip() diff --git a/automem/utils/time.py b/automem/utils/time.py index fae94be..ee2055e 100644 --- a/automem/utils/time.py +++ b/automem/utils/time.py @@ -10,7 +10,7 @@ def utc_now() -> str: def _parse_iso_datetime(value: Optional[str]) -> Optional[datetime]: - """Parse ISO strings that may end with Z into aware datetimes.""" + """Parse ISO strings into timezone-aware datetimes (UTC fallback for naive).""" if not value: return None @@ -22,7 +22,11 @@ def _parse_iso_datetime(value: Optional[str]) -> Optional[datetime]: candidate = candidate[:-1] + "+00:00" try: - return datetime.fromisoformat(candidate) + dt = datetime.fromisoformat(candidate) + # Ensure timezone-aware (assume UTC if naive) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt except ValueError: return None diff --git a/tests/test_app.py b/tests/test_app.py index a98add8..0b2beb9 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -377,3 +377,55 @@ def test_update_last_accessed_handles_none_graph(monkeypatch): # Should not raise, should be a no-op app.update_last_accessed(["some-id"]) + + +# ============================================================================ +# DateTime timezone handling tests (hotfix/production-bugs) +# ============================================================================ + + +def test_parse_iso_datetime_naive_assumes_utc(): + """Naive timestamps (no timezone) should be treated as UTC.""" + from datetime import timezone + + from automem.utils.time import _parse_iso_datetime + + result = _parse_iso_datetime("2024-01-15T10:30:00") + assert result is not None + assert result.tzinfo == timezone.utc + + +def test_parse_iso_datetime_aware_preserved(): + """Timestamps with explicit timezone should preserve it.""" + from datetime import timezone + + from automem.utils.time import _parse_iso_datetime + + # UTC with Z suffix + result = _parse_iso_datetime("2024-01-15T10:30:00Z") + assert result is not None + assert result.tzinfo == timezone.utc + + # With explicit offset + result = _parse_iso_datetime("2024-01-15T10:30:00+05:30") + assert result is not None + assert result.tzinfo is not None + + +def test_result_passes_filters_mixed_naive_aware(): + """Filter comparison should work with mixed naive/aware timestamps.""" + # Function expects result dict with "memory" key containing the timestamp + result = app._result_passes_filters( + result={"memory": {"timestamp": "2024-01-15T10:30:00"}}, # naive + start_time="2024-01-14T00:00:00Z", # aware + end_time="2024-01-16T00:00:00Z", # aware + ) + assert result is True + + # Should fail if outside range + result = app._result_passes_filters( + result={"memory": {"timestamp": "2024-01-13T10:30:00"}}, # naive, before start + start_time="2024-01-14T00:00:00Z", + end_time="2024-01-16T00:00:00Z", + ) + assert result is False From ad425b5e3795b90427d52edf057f3b8031f2a418 Mon Sep 17 00:00:00 2001 From: Jack Arturo Date: Fri, 9 Jan 2026 21:55:22 +0100 Subject: [PATCH 06/25] fix: Correct token parameter for gpt-5 models (max_output_tokens) Per CodeRabbit review: - o-series (o1, o3, etc.): max_completion_tokens - gpt-5 family: max_output_tokens - Others (gpt-4o-mini, etc.): max_tokens Co-Authored-By: Claude Opus 4.5 --- app.py | 13 +++++++------ automem/utils/text.py | 13 +++++++------ 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/app.py b/app.py index 8dfd4bd..e75d35c 100644 --- a/app.py +++ b/app.py @@ -785,12 +785,13 @@ def _classify_with_llm(self, content: str) -> Optional[tuple[str, float]]: return None try: - # Use max_completion_tokens for o-series/newer models, max_tokens for others - token_param = ( - {"max_completion_tokens": 50} - if CLASSIFICATION_MODEL.startswith(("o1", "o3", "gpt-5")) - else {"max_tokens": 50} - ) + # Use appropriate token parameter based on model family + if CLASSIFICATION_MODEL.startswith("o"): # o-series (o1, o3, etc.) + token_param = {"max_completion_tokens": 50} + elif CLASSIFICATION_MODEL.startswith("gpt-5"): # gpt-5 family + token_param = {"max_output_tokens": 50} + else: # gpt-4o-mini, gpt-4, etc. + token_param = {"max_tokens": 50} response = state.openai_client.chat.completions.create( model=CLASSIFICATION_MODEL, messages=[ diff --git a/automem/utils/text.py b/automem/utils/text.py index 3ca45d2..f502191 100644 --- a/automem/utils/text.py +++ b/automem/utils/text.py @@ -147,12 +147,13 @@ def summarize_content( # Estimate tokens from target character length (~4 chars/token), cap at 150 token_limit = min(150, max(1, int(target_length / 4))) - # Use max_completion_tokens for o-series/newer models, max_tokens for others - token_param = ( - {"max_completion_tokens": token_limit} - if model.startswith(("o1", "o3", "gpt-5")) - else {"max_tokens": token_limit} - ) + # Use appropriate token parameter based on model family + if model.startswith("o"): # o-series (o1, o3, etc.) + token_param = {"max_completion_tokens": token_limit} + elif model.startswith("gpt-5"): # gpt-5 family + token_param = {"max_output_tokens": token_limit} + else: # gpt-4o-mini, gpt-4, etc. + token_param = {"max_tokens": token_limit} response = openai_client.chat.completions.create( model=model, From 00012a2dad96ae3de0d028e65aa40590124e7ea5 Mon Sep 17 00:00:00 2001 From: Jack Arturo Date: Sat, 10 Jan 2026 06:12:32 +0100 Subject: [PATCH 07/25] fix: Defensive coding improvements for automem_watch.py Per CodeRabbit review: - Coerce tags to strings before joining (handle non-string tags) - Truncate large metadata values with ellipsis - Validate score_range is 2-element numeric sequence, fallback to "n/a" - Use explicit httpx.Timeout (connect=10s, read=None, write=10s) Co-Authored-By: Claude Opus 4.5 --- scripts/automem_watch.py | 35 ++++++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/scripts/automem_watch.py b/scripts/automem_watch.py index af85af5..3fba261 100755 --- a/scripts/automem_watch.py +++ b/scripts/automem_watch.py @@ -53,15 +53,21 @@ def print_store_event(event: dict) -> None: f"[dim]{elapsed}ms[/]" ) - # Tags + # Tags - coerce to strings for safety tags = data.get("tags", []) if tags: - console.print(f" [cyan]tags:[/] {', '.join(tags)}") + safe_tags = [str(t) for t in tags] + console.print(f" [cyan]tags:[/] {', '.join(safe_tags)}") - # Metadata + # Metadata - truncate large values metadata = data.get("metadata", {}) if metadata: - meta_preview = ", ".join(f"{k}={v}" for k, v in list(metadata.items())[:5]) + + def safe_val(v: object, max_len: int = 50) -> str: + s = str(v) if isinstance(v, (str, int, float, bool)) else repr(v) + return s[:max_len] + "..." if len(s) > max_len else s + + meta_preview = ", ".join(f"{k}={safe_val(v)}" for k, v in list(metadata.items())[:5]) if len(metadata) > 5: meta_preview += f", ... (+{len(metadata) - 5} more)" console.print(f" [cyan]metadata:[/] {{{meta_preview}}}") @@ -110,15 +116,24 @@ def print_recall_event(event: dict) -> None: if filters: console.print(f" [yellow]filters:[/] {', '.join(filters)}") - # Stats + # Stats - defensive score_range handling stats = data.get("stats", {}) if stats: avg_len = stats.get("avg_length", 0) avg_tags = stats.get("avg_tags", 0) - score_range = stats.get("score_range", [0, 0]) + score_range = stats.get("score_range", []) + + # Safely format score range + try: + if len(score_range) >= 2: + score_str = f"{float(score_range[0]):.2f}-{float(score_range[1]):.2f}" + else: + score_str = "n/a" + except (TypeError, ValueError): + score_str = "n/a" + console.print( - f" [yellow]stats:[/] avg_len={avg_len} avg_tags={avg_tags} " - f"score_range={score_range[0]:.2f}-{score_range[1]:.2f}" + f" [yellow]stats:[/] avg_len={avg_len} avg_tags={avg_tags} " f"score_range={score_str}" ) # Top results @@ -311,7 +326,9 @@ def stream_events(url: str, token: str) -> None: while True: try: - with httpx.Client(timeout=None) as client: + # Explicit timeout: finite connect/write, infinite read for SSE + timeout = httpx.Timeout(connect=10.0, read=None, write=10.0) + with httpx.Client(timeout=timeout) as client: with client.stream("GET", f"{url}/stream", headers=headers) as resp: if resp.status_code != 200: console.print(f"[red]HTTP {resp.status_code}[/]") From a621599438ebfb2a3ac0f7ab1ee5c08828186e33 Mon Sep 17 00:00:00 2001 From: Jack Arturo Date: Sat, 10 Jan 2026 06:32:33 +0100 Subject: [PATCH 08/25] fix: More defensive coding for automem_watch.py payloads Per CodeRabbit review: - Add _safe_float() and _safe_int() helpers for robust type conversion - Coerce mem_id to string before slicing - Safe conversions for type_conf, importance, elapsed in store events - Safe conversions for result_count, dedup, elapsed in recall events - isinstance check for tags_filter before len() - Safe conversions for score, content_length, tags_count in result summaries Co-Authored-By: Claude Opus 4.5 --- scripts/automem_watch.py | 54 +++++++++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/scripts/automem_watch.py b/scripts/automem_watch.py index 3fba261..da20b9f 100755 --- a/scripts/automem_watch.py +++ b/scripts/automem_watch.py @@ -31,6 +31,22 @@ def format_timestamp(ts: str) -> str: return ts[:12] +def _safe_float(val: object, default: float = 0.0) -> float: + """Safely convert value to float.""" + try: + return float(val) if val is not None else default + except (TypeError, ValueError): + return default + + +def _safe_int(val: object, default: int = 0) -> int: + """Safely convert value to int.""" + try: + return int(float(val)) if val is not None else default + except (TypeError, ValueError): + return default + + def print_store_event(event: dict) -> None: """Print a memory.store event with full content.""" data = event.get("data", {}) @@ -39,12 +55,12 @@ def print_store_event(event: dict) -> None: console.print() console.print(f"[bold green]━━━ STORE[/] [dim]{ts}[/]") - # Memory info line - mem_id = data.get("memory_id", "?")[:8] + # Memory info line - defensive conversions + mem_id = str(data.get("memory_id", "?"))[:8] mem_type = data.get("type", "Memory") - type_conf = data.get("type_confidence", 0) - importance = data.get("importance", 0.5) - elapsed = data.get("elapsed_ms", 0) + type_conf = _safe_float(data.get("type_confidence"), 0.0) + importance = _safe_float(data.get("importance"), 0.5) + elapsed = _safe_int(data.get("elapsed_ms"), 0) console.print( f" [cyan]id:[/] {mem_id} " @@ -95,22 +111,25 @@ def print_recall_event(event: dict) -> None: console.print() console.print(f"[bold cyan]━━━ RECALL[/] [dim]{ts}[/]") - # Query info + # Query info - defensive conversions query = data.get("query", "") - result_count = data.get("result_count", 0) - dedup = data.get("dedup_removed", 0) - elapsed = data.get("elapsed_ms", 0) + result_count = _safe_int(data.get("result_count"), 0) + dedup = _safe_int(data.get("dedup_removed"), 0) + elapsed = _safe_int(data.get("elapsed_ms"), 0) dedup_str = f" (dedup: {dedup})" if dedup else "" console.print(f' [yellow]query:[/] "{query}"') console.print(f" [yellow]results:[/] {result_count}{dedup_str} [dim]{elapsed}ms[/]") - # Filters + # Filters - defensive check for tags_filter filters = [] if data.get("has_time_filter"): filters.append("time") - if data.get("tags_filter"): - filters.append(f"tags({len(data['tags_filter'])})") + tags_filter = data.get("tags_filter") + if isinstance(tags_filter, (list, tuple, set)): + filters.append(f"tags({len(tags_filter)})") + elif tags_filter: + filters.append("tags(?)") if data.get("vector_search"): filters.append("vector") if filters: @@ -136,16 +155,17 @@ def print_recall_event(event: dict) -> None: f" [yellow]stats:[/] avg_len={avg_len} avg_tags={avg_tags} " f"score_range={score_str}" ) - # Top results + # Top results - defensive field conversions summaries = data.get("result_summaries", []) if summaries: console.print(" [yellow]top results:[/]") for i, r in enumerate(summaries[:3], 1): + r_type = str(r.get("type", "?"))[:8] + r_score = _safe_float(r.get("score"), 0.0) + r_len = _safe_int(r.get("content_length"), 0) + r_tags = _safe_int(r.get("tags_count"), 0) console.print( - f" #{i} [{r.get('type', '?'):8s}] " - f"score={r.get('score', 0):.2f} " - f"len={r.get('content_length', 0)} " - f"tags={r.get('tags_count', 0)}" + f" #{i} [{r_type:8s}] " f"score={r_score:.2f} " f"len={r_len} " f"tags={r_tags}" ) From 05fb0b45871d34726f91509c987cfae2e17b1be9 Mon Sep 17 00:00:00 2001 From: Jack Arturo Date: Sat, 10 Jan 2026 06:44:48 +0100 Subject: [PATCH 09/25] fix: Final defensive coding fixes for automem_watch.py Per CodeRabbit review: - Wrap memory_id in enrichment events with str() before slicing - Wrap temporal link IDs with str() before slicing - Wrap memory1_id/memory2_id in associate events with str() - Add return type hint to main() -> None Co-Authored-By: Claude Opus 4.5 --- scripts/automem_watch.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/scripts/automem_watch.py b/scripts/automem_watch.py index da20b9f..b3fb05a 100755 --- a/scripts/automem_watch.py +++ b/scripts/automem_watch.py @@ -175,7 +175,7 @@ def print_enrichment_event(event: dict) -> None: ts = format_timestamp(event.get("timestamp", "")) event_type = event.get("type", "") - mem_id = data.get("memory_id", "?")[:8] + mem_id = str(data.get("memory_id", "?"))[:8] if event_type == "enrichment.start": attempt = data.get("attempt", 1) @@ -229,7 +229,7 @@ def print_enrichment_event(event: dict) -> None: console.print("") console.print(" [dim]links created:[/]") if temporal_links: - ids = [tid[:8] for tid in temporal_links] + ids = [str(tid)[:8] for tid in temporal_links] console.print(f" temporal: {', '.join(ids)} ({len(ids)} memories)") if semantic_neighbors: neighbor_strs = [f"{nid} ({score})" for nid, score in semantic_neighbors] @@ -289,8 +289,8 @@ def print_associate_event(event: dict) -> None: data = event.get("data", {}) ts = format_timestamp(event.get("timestamp", "")) - mem1 = data.get("memory1_id", "?")[:8] - mem2 = data.get("memory2_id", "?")[:8] + mem1 = str(data.get("memory1_id", "?"))[:8] + mem2 = str(data.get("memory2_id", "?"))[:8] rel_type = data.get("relation_type", "?") strength = data.get("strength", 0.5) @@ -380,7 +380,7 @@ def stream_events(url: str, token: str) -> None: time.sleep(5) -def main(): +def main() -> None: parser = argparse.ArgumentParser( description="AutoMem tail - stream memory operations", formatter_class=argparse.RawDescriptionHelpFormatter, From 9538ab84d06695c2ac2ecb8c08c115740239da1d Mon Sep 17 00:00:00 2001 From: Jack Arturo Date: Sat, 10 Jan 2026 07:01:53 +0100 Subject: [PATCH 10/25] refactor(watch): Use Event type alias for all event handler functions Addresses final CodeRabbit review comment about proper type hints. All event handler functions now use Event = Mapping[str, Any] instead of dict for better type safety with SSE payloads. Co-Authored-By: Claude Opus 4.5 --- scripts/automem_watch.py | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/scripts/automem_watch.py b/scripts/automem_watch.py index b3fb05a..64fc0c3 100755 --- a/scripts/automem_watch.py +++ b/scripts/automem_watch.py @@ -11,6 +11,7 @@ import sys import textwrap import time +from typing import Any, Mapping try: import httpx @@ -23,9 +24,15 @@ console = Console() +# Type alias for SSE event payloads +Event = Mapping[str, Any] -def format_timestamp(ts: str) -> str: + +def format_timestamp(ts: Any) -> str: """Extract just the time from ISO timestamp.""" + if ts is None: + return "" + ts = str(ts) if "T" in ts: return ts.split("T")[1][:12] return ts[:12] @@ -47,7 +54,7 @@ def _safe_int(val: object, default: int = 0) -> int: return default -def print_store_event(event: dict) -> None: +def print_store_event(event: Event) -> None: """Print a memory.store event with full content.""" data = event.get("data", {}) ts = format_timestamp(event.get("timestamp", "")) @@ -103,7 +110,7 @@ def safe_val(v: object, max_len: int = 50) -> str: console.print(f"[white]{wrapped}[/]") -def print_recall_event(event: dict) -> None: +def print_recall_event(event: Event) -> None: """Print a memory.recall event with query details and result summaries.""" data = event.get("data", {}) ts = format_timestamp(event.get("timestamp", "")) @@ -169,7 +176,7 @@ def print_recall_event(event: dict) -> None: ) -def print_enrichment_event(event: dict) -> None: +def print_enrichment_event(event: Event) -> None: """Print enrichment events with details.""" data = event.get("data", {}) ts = format_timestamp(event.get("timestamp", "")) @@ -258,7 +265,7 @@ def print_enrichment_event(event: dict) -> None: ) -def print_consolidation_event(event: dict) -> None: +def print_consolidation_event(event: Event) -> None: """Print consolidation events.""" data = event.get("data", {}) ts = format_timestamp(event.get("timestamp", "")) @@ -275,7 +282,7 @@ def print_consolidation_event(event: dict) -> None: ) -def print_error_event(event: dict) -> None: +def print_error_event(event: Event) -> None: """Print error events.""" data = event.get("data", {}) ts = format_timestamp(event.get("timestamp", "")) @@ -284,7 +291,7 @@ def print_error_event(event: dict) -> None: console.print(f"[dim]{ts}[/] [bold red]ERROR[/] {error}") -def print_associate_event(event: dict) -> None: +def print_associate_event(event: Event) -> None: """Print a memory.associate event.""" data = event.get("data", {}) ts = format_timestamp(event.get("timestamp", "")) @@ -301,7 +308,7 @@ def print_associate_event(event: dict) -> None: ) -def print_raw_event(event: dict) -> None: +def print_raw_event(event: Event) -> None: """Print any other event as JSON.""" ts = format_timestamp(event.get("timestamp", "")) event_type = event.get("type", "unknown") @@ -315,7 +322,7 @@ def print_raw_event(event: dict) -> None: console.print(syntax) -def process_event(event: dict) -> None: +def process_event(event: Event) -> None: """Route event to appropriate printer.""" event_type = event.get("type", "") From ef85ca00ba1a1c8659a375247e79ae9ab1cd8eee Mon Sep 17 00:00:00 2001 From: Jack Arturo Date: Sat, 10 Jan 2026 07:03:14 +0100 Subject: [PATCH 11/25] fix(watch): Defensive unpacking for semantic_neighbors Handles malformed 2-element sequences in semantic_neighbors data by using try/except with index access instead of tuple unpacking. Co-Authored-By: Claude Opus 4.5 --- scripts/automem_watch.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/scripts/automem_watch.py b/scripts/automem_watch.py index 64fc0c3..4cf3e1b 100755 --- a/scripts/automem_watch.py +++ b/scripts/automem_watch.py @@ -239,7 +239,13 @@ def print_enrichment_event(event: Event) -> None: ids = [str(tid)[:8] for tid in temporal_links] console.print(f" temporal: {', '.join(ids)} ({len(ids)} memories)") if semantic_neighbors: - neighbor_strs = [f"{nid} ({score})" for nid, score in semantic_neighbors] + neighbor_strs = [] + for item in semantic_neighbors: + try: + nid, score = item[0], item[1] + neighbor_strs.append(f"{nid} ({score})") + except (IndexError, TypeError): + neighbor_strs.append(str(item)[:20]) console.print(f" semantic: {', '.join(neighbor_strs)}") if patterns: for p in patterns: From 5726da87f53be1091a19b9eb5f9cc679cc86fe06 Mon Sep 17 00:00:00 2001 From: Jack Arturo Date: Sat, 10 Jan 2026 07:11:20 +0100 Subject: [PATCH 12/25] fix(watch): SSE multi-line parsing, narrow exceptions, auth error handling - Buffer multi-line data: frames per SSE RFC (join with newlines) - Dispatch on empty line (end of event marker) - Narrow exception handling to httpx.RequestError/HTTPStatusError to avoid masking programming errors (BLE001) - Differentiate 401/403 auth errors (exit with message) from 5xx transient errors (retry with backoff) Co-Authored-By: Claude Opus 4.5 --- scripts/automem_watch.py | 41 +++++++++++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/scripts/automem_watch.py b/scripts/automem_watch.py index 4cf3e1b..5939766 100755 --- a/scripts/automem_watch.py +++ b/scripts/automem_watch.py @@ -360,9 +360,20 @@ def stream_events(url: str, token: str) -> None: while True: try: # Explicit timeout: finite connect/write, infinite read for SSE - timeout = httpx.Timeout(connect=10.0, read=None, write=10.0) + timeout = httpx.Timeout(connect=10.0, read=None, write=10.0, pool=10.0) with httpx.Client(timeout=timeout) as client: with client.stream("GET", f"{url}/stream", headers=headers) as resp: + # Differentiate auth errors from transient errors + if resp.status_code in (401, 403): + console.print(f"[red]Authentication failed (HTTP {resp.status_code})[/]") + console.print("[dim]Check your API token[/]") + break + if resp.status_code >= 500: + raise httpx.HTTPStatusError( + f"Server error {resp.status_code}", + request=resp.request, + response=resp, + ) if resp.status_code != 200: console.print(f"[red]HTTP {resp.status_code}[/]") break @@ -372,21 +383,33 @@ def stream_events(url: str, token: str) -> None: else: console.print("[green]Connected[/]") + # SSE multi-line data buffer (per RFC 8895) + data_buffer: list[str] = [] + for line in resp.iter_lines(): - if line.startswith("data: "): - try: - event = json.loads(line[6:]) - process_event(event) - except json.JSONDecodeError: - pass + if line.startswith("data:"): + # Accumulate data lines (strip "data:" or "data: " prefix) + payload = line[5:].lstrip(" ") if len(line) > 5 else "" + data_buffer.append(payload) + elif line == "": + # Empty line = end of event, dispatch buffered data + if data_buffer: + full_data = "\n".join(data_buffer) + data_buffer.clear() + try: + event = json.loads(full_data) + process_event(event) + except json.JSONDecodeError: + pass elif line.startswith(":"): - # Keepalive - ignore silently + # Keepalive comment - ignore silently pass except KeyboardInterrupt: console.print("\n[bold]Disconnected.[/]") break - except Exception as e: + except (httpx.RequestError, httpx.HTTPStatusError) as e: + # Network/transient errors - retry reconnect_count += 1 console.print(f"[red]Connection error:[/] {e}") console.print(f"[dim]Reconnecting in 5s... (attempt #{reconnect_count})[/]") From f51551e1124c99bbcac6e6ef93e03e7a71eb86a9 Mon Sep 17 00:00:00 2001 From: Jack Arturo Date: Sat, 10 Jan 2026 07:18:54 +0100 Subject: [PATCH 13/25] fix: Log malformed event data in stream_events Adds a console print statement to log malformed event data when a keepalive comment is encountered in the stream_events function. This helps with debugging by providing visibility into unexpected data. --- scripts/automem_watch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/automem_watch.py b/scripts/automem_watch.py index 5939766..d463f6c 100755 --- a/scripts/automem_watch.py +++ b/scripts/automem_watch.py @@ -403,7 +403,7 @@ def stream_events(url: str, token: str) -> None: pass elif line.startswith(":"): # Keepalive comment - ignore silently - pass + console.print(f"[dim red]Malformed event data:[/] {full_data[:100]}") except KeyboardInterrupt: console.print("\n[bold]Disconnected.[/]") From 59ea7c47a9268d8845190efaa12870cb769b3118 Mon Sep 17 00:00:00 2001 From: Jack Arturo Date: Sat, 10 Jan 2026 07:27:31 +0100 Subject: [PATCH 14/25] feat: add Codex auto-fix workflow for CI failures --- .github/codex/prompts/fix-ci.md | 36 ++++++++++ .github/workflows/auto-fix.yml | 122 ++++++++++++++++++++++++++++++++ 2 files changed, 158 insertions(+) create mode 100644 .github/codex/prompts/fix-ci.md create mode 100644 .github/workflows/auto-fix.yml diff --git a/.github/codex/prompts/fix-ci.md b/.github/codex/prompts/fix-ci.md new file mode 100644 index 0000000..61ed549 --- /dev/null +++ b/.github/codex/prompts/fix-ci.md @@ -0,0 +1,36 @@ +# Fix CI Failures - AutoMem/AutoHub + +You are fixing CI failures for the AutoMem/AutoHub project. This is a Python Flask application with: +- Flask + SSE (Server-Sent Events) +- FalkorDB (graph database) +- Qdrant (vector database) +- Docker/Docker Compose deployment +- pytest for testing + +## Your Task + +Analyze the test failures and CodeRabbit comments below, then make the **minimal changes** needed to fix them. + +## Rules + +1. **Be surgical** - Only change what's necessary to fix the specific issue +2. **Don't refactor** - Resist the urge to "improve" adjacent code +3. **Match existing style** - Follow the patterns already in the codebase +4. **Preserve API** - Don't change endpoints or response formats unless the fix requires it +5. **Keep tests passing** - Your fix should not break other tests + +## Common Fixes + +- **Import errors**: Check relative imports, ensure modules exist +- **Type hints**: Add proper typing for function signatures +- **Async issues**: Ensure proper async/await usage +- **Database errors**: Check connection handling and query syntax +- **API errors**: Validate request/response handling + +## What NOT to Do + +- Don't add new features +- Don't refactor working code +- Don't change Docker configuration unless required +- Don't modify environment variable handling unnecessarily +- Don't add excessive logging or comments diff --git a/.github/workflows/auto-fix.yml b/.github/workflows/auto-fix.yml new file mode 100644 index 0000000..9d38e75 --- /dev/null +++ b/.github/workflows/auto-fix.yml @@ -0,0 +1,122 @@ +name: Auto-Fix with Codex + +on: + workflow_run: + workflows: ["CI"] + types: [completed] + workflow_dispatch: + inputs: + prompt: + description: 'Custom instructions (optional)' + required: false + type: string + branch: + description: 'Branch to fix (defaults to current)' + required: false + type: string + +jobs: + auto-fix: + if: | + github.event_name == 'workflow_dispatch' || + github.event.workflow_run.conclusion == 'failure' + runs-on: ubuntu-latest + permissions: + contents: write + pull-requests: write + actions: read + + steps: + - name: Determine branch + id: branch + run: | + if [ "${{ github.event_name }}" == "workflow_dispatch" ]; then + BRANCH="${{ inputs.branch || github.ref_name }}" + else + BRANCH="${{ github.event.workflow_run.head_branch }}" + fi + echo "branch=$BRANCH" >> $GITHUB_OUTPUT + echo "🎯 Target branch: $BRANCH" + + - uses: actions/checkout@v4 + with: + ref: ${{ steps.branch.outputs.branch }} + fetch-depth: 0 + + - name: Setup Codex Auth + run: | + mkdir -p ~/.codex + echo '${{ secrets.CODEX_AUTH_JSON }}' > ~/.codex/auth.json + + - name: Gather failure context + id: context + if: github.event_name == 'workflow_run' + run: | + echo "📋 Fetching CI failure logs..." + gh run view ${{ github.event.workflow_run.id }} --log-failed > /tmp/ci-failures.log 2>&1 || true + + echo "📋 Fetching CodeRabbit comments..." + PR_NUMBER=$(gh pr list --head "${{ steps.branch.outputs.branch }}" --json number -q '.[0].number' 2>/dev/null || echo "") + if [ -n "$PR_NUMBER" ]; then + gh pr view "$PR_NUMBER" --json comments -q '.comments[] | select(.author.login == "coderabbitai") | .body' > /tmp/coderabbit.txt 2>&1 || true + fi + + echo "# CI Failure Context" > /tmp/context.md + if [ -s /tmp/ci-failures.log ]; then + echo "## Test Failures" >> /tmp/context.md + echo '```' >> /tmp/context.md + tail -200 /tmp/ci-failures.log >> /tmp/context.md + echo '```' >> /tmp/context.md + fi + if [ -s /tmp/coderabbit.txt ]; then + echo "## CodeRabbit Comments" >> /tmp/context.md + cat /tmp/coderabbit.txt >> /tmp/context.md + fi + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + - name: Prepare prompt + run: | + if [ -n "${{ inputs.prompt }}" ]; then + echo "${{ inputs.prompt }}" > /tmp/prompt.md + elif [ -f ".github/codex/prompts/fix-ci.md" ]; then + cat .github/codex/prompts/fix-ci.md > /tmp/prompt.md + else + cat > /tmp/prompt.md << 'PROMPT' + Fix the CI failures in this repository. + Make minimal, surgical changes only. + PROMPT + fi + + if [ -f /tmp/context.md ]; then + echo "" >> /tmp/prompt.md + echo "---" >> /tmp/prompt.md + cat /tmp/context.md >> /tmp/prompt.md + fi + + - name: Run Codex + uses: openai/codex-action@v1 + with: + prompt-file: /tmp/prompt.md + codex-args: '["--full-auto"]' + sandbox: workspace-write + safety-strategy: drop-sudo + + - name: Check for changes + id: changes + run: | + if git diff --quiet; then + echo "has_changes=false" >> $GITHUB_OUTPUT + else + echo "has_changes=true" >> $GITHUB_OUTPUT + git diff --stat + fi + + - name: Commit and push + if: steps.changes.outputs.has_changes == 'true' + run: | + git config user.name "codex[bot]" + git config user.email "codex[bot]@users.noreply.github.com" + git add . + git commit -m "fix: auto-fix CI failures [skip ci]" + git push From 63f668ada65cd253e504909a0079addca1a38cd9 Mon Sep 17 00:00:00 2001 From: Jack Arturo Date: Sat, 10 Jan 2026 19:50:49 +0100 Subject: [PATCH 15/25] fix(watch): SSE multi-line parsing, narrow exceptions, auth error handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Current stream parsing only handles single-line data: frames; multi-line data: events (valid per SSE RFC 8030) are dropped/corrupted. Ruff config enables BLE001, which flags except Exception without re-raising—this can cause infinite reconnect loops on programmer errors like AttributeError. Non-200 responses don't differentiate between auth failures (401/403) that shouldn't retry and transient errors that should. Changes: - Buffer multi-line data: frames per SSE RFC (join with newlines) - Dispatch on empty line (end of event marker) - Narrow exception handling to httpx.RequestError/HTTPStatusError - Differentiate 401/403 (exit) from 5xx (retry with backoff) - SSL-specific error detection with exponential backoff (Railway LB issue) - Add --max-reconnects flag to limit reconnection attempts - Fix misplaced malformed data logging (was under keepalive handler) - Log malformed JSON event data for debugging Co-Authored-By: Claude Opus 4.5 --- scripts/automem_watch.py | 57 +++++++++++++++++++++++++++++++++------- 1 file changed, 48 insertions(+), 9 deletions(-) diff --git a/scripts/automem_watch.py b/scripts/automem_watch.py index d463f6c..e48be2e 100755 --- a/scripts/automem_watch.py +++ b/scripts/automem_watch.py @@ -348,10 +348,17 @@ def process_event(event: Event) -> None: print_raw_event(event) -def stream_events(url: str, token: str) -> None: - """Connect to SSE stream and print events.""" +def stream_events(url: str, token: str, max_reconnects: int = 0) -> None: + """Connect to SSE stream and print events. + + Args: + url: AutoMem API base URL + token: API authentication token + max_reconnects: Max reconnection attempts (0 = unlimited) + """ headers = {"Authorization": f"Bearer {token}"} reconnect_count = 0 + consecutive_ssl_errors = 0 console.print(f"[bold]Connecting to {url}/stream...[/]") console.print("[dim]Press Ctrl+C to stop[/]") @@ -378,6 +385,9 @@ def stream_events(url: str, token: str) -> None: console.print(f"[red]HTTP {resp.status_code}[/]") break + # Reset SSL error counter on successful connect + consecutive_ssl_errors = 0 + if reconnect_count > 0: console.print(f"[green]Connected[/] [dim](reconnect #{reconnect_count})[/]") else: @@ -400,20 +410,42 @@ def stream_events(url: str, token: str) -> None: event = json.loads(full_data) process_event(event) except json.JSONDecodeError: - pass + console.print( + f"[dim red]Malformed event data:[/] {full_data[:100]}" + ) elif line.startswith(":"): # Keepalive comment - ignore silently - console.print(f"[dim red]Malformed event data:[/] {full_data[:100]}") + pass except KeyboardInterrupt: console.print("\n[bold]Disconnected.[/]") break except (httpx.RequestError, httpx.HTTPStatusError) as e: - # Network/transient errors - retry reconnect_count += 1 - console.print(f"[red]Connection error:[/] {e}") - console.print(f"[dim]Reconnecting in 5s... (attempt #{reconnect_count})[/]") - time.sleep(5) + error_str = str(e) + + # Detect SSL errors (Railway load balancer kills idle SSE connections) + is_ssl_error = "SSL" in error_str or "ssl" in error_str or "handshake" in error_str + + if is_ssl_error: + consecutive_ssl_errors += 1 + # Exponential backoff for SSL errors: 2s, 4s, 8s, 16s, max 30s + backoff = min(30, 2 ** min(consecutive_ssl_errors, 4)) + console.print( + f"[yellow]SSL connection reset[/] [dim](Railway LB, backoff {backoff}s)[/]" + ) + else: + consecutive_ssl_errors = 0 + backoff = 5 + console.print(f"[red]Connection error:[/] {e}") + + # Check max reconnects + if max_reconnects > 0 and reconnect_count >= max_reconnects: + console.print(f"[red]Max reconnects ({max_reconnects}) reached. Exiting.[/]") + break + + console.print(f"[dim]Reconnecting in {backoff}s... (attempt #{reconnect_count})[/]") + time.sleep(backoff) def main() -> None: @@ -424,13 +456,20 @@ def main() -> None: Examples: python scripts/automem_watch.py --url http://localhost:8001 --token dev python scripts/automem_watch.py --url https://automem.up.railway.app --token $TOKEN + python scripts/automem_watch.py --url $URL --token $TOKEN --max-reconnects 100 """, ) parser.add_argument("--url", required=True, help="AutoMem API URL") parser.add_argument("--token", required=True, help="API token") + parser.add_argument( + "--max-reconnects", + type=int, + default=0, + help="Max reconnection attempts (0 = unlimited, default)", + ) args = parser.parse_args() - stream_events(args.url.rstrip("/"), args.token) + stream_events(args.url.rstrip("/"), args.token, args.max_reconnects) if __name__ == "__main__": From 824a77422f713d56a2ad97a9a9a248b364a08e21 Mon Sep 17 00:00:00 2001 From: Jack Arturo Date: Sat, 10 Jan 2026 19:56:47 +0100 Subject: [PATCH 16/25] feat: Add web-based SSE monitor dashboard Browser-based alternative to automem_watch.py: - Visit /monitor to open real-time event stream dashboard - Token stored in localStorage after first prompt (or via ?token=X) - Clickable expandable details for all event types - Color-coded event types (store/recall/enrichment/consolidation) - Raw JSON toggle for debugging - Auto-reconnect handled by browser's EventSource - No SSL issues (browser TLS stack) - Mobile-friendly responsive design Co-Authored-By: Claude Opus 4.5 --- app.py | 8 +- static/monitor.html | 495 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 502 insertions(+), 1 deletion(-) create mode 100644 static/monitor.html diff --git a/app.py b/app.py index e75d35c..a4435ca 100644 --- a/app.py +++ b/app.py @@ -110,7 +110,7 @@ def __init__(self, id: str, vector: List[float], payload: Dict[str, Any]): if str(root) not in sys.path: sys.path.insert(0, str(root)) -app = Flask(__name__) +app = Flask(__name__, static_folder="static") # Legacy blueprint placeholders for deprecated route definitions below. # These are not registered with the app and are safe to keep until full removal. @@ -3823,6 +3823,12 @@ def get_related_memories(memory_id: str) -> Any: app.register_blueprint(stream_bp) +@app.route("/monitor") +def monitor_page() -> Any: + """Serve the real-time SSE monitor dashboard.""" + return app.send_static_file("monitor.html") + + if __name__ == "__main__": port = int(os.environ.get("PORT", "8001")) logger.info("Starting Flask API on port %s", port) diff --git a/static/monitor.html b/static/monitor.html new file mode 100644 index 0000000..4d988c5 --- /dev/null +++ b/static/monitor.html @@ -0,0 +1,495 @@ + + + + + + AutoMem Monitor + + + +
+
+

AutoMem Monitor

+
+ Events: 0 + Reconnects: 0 +
+
+
Connecting...
+
+ +
+
+ + + +

Waiting for events...

+
+
+ + + + From ab08c1b498e725fe47aa3d5d94dfc4561c1072c9 Mon Sep 17 00:00:00 2001 From: Jack Arturo Date: Sat, 10 Jan 2026 20:27:19 +0100 Subject: [PATCH 17/25] feat(monitor): Add login screen with shared auth (Graph Viewer style) - Replace prompt() with proper login form UI - Token stored in localStorage (key: automem_token) - Auto-connect on page load if token exists - Show error on invalid token / connection failure - Add disconnect button to clear token and return to login - /monitor endpoint whitelisted (serves just login form, no data) - Shared auth with future /graph endpoint (same localStorage key) Design matches graph-viewer-production.up.railway.app for consistency. Login to either module authenticates both. Co-Authored-By: Claude Opus 4.5 --- app.py | 4 +- static/monitor.html | 350 ++++++++++++++++++++++++++++++++++++-------- 2 files changed, 288 insertions(+), 66 deletions(-) diff --git a/app.py b/app.py index a4435ca..440ae9e 100644 --- a/app.py +++ b/app.py @@ -1142,9 +1142,9 @@ def require_api_token() -> None: if not API_TOKEN: return - # Allow unauthenticated health checks (supports blueprint endpoint names) + # Allow unauthenticated health checks and monitor page endpoint = request.endpoint or "" - if endpoint.endswith("health") or request.path == "/health": + if endpoint.endswith("health") or request.path in ("/health", "/monitor"): return token = _extract_api_token() diff --git a/static/monitor.html b/static/monitor.html index 4d988c5..6306c6d 100644 --- a/static/monitor.html +++ b/static/monitor.html @@ -11,6 +11,8 @@ --border: #30363d; --text: #c9d1d9; --muted: #8b949e; + --accent: #a371f7; + --accent-hover: #b687f8; --store: #238636; --recall: #1f6feb; --enrichment: #a371f7; @@ -24,8 +26,119 @@ background: var(--bg); color: var(--text); margin: 0; - padding: 20px; + padding: 0; line-height: 1.5; + min-height: 100vh; + } + + /* Login Screen */ + #login-screen { + display: flex; + flex-direction: column; + align-items: center; + justify-content: center; + min-height: 100vh; + padding: 20px; + } + .login-logo { + width: 80px; + height: 80px; + background: linear-gradient(135deg, var(--accent), #8b5cf6); + border-radius: 20px; + display: flex; + align-items: center; + justify-content: center; + font-size: 2rem; + font-weight: bold; + color: white; + margin-bottom: 24px; + } + .login-title { + font-size: 1.75rem; + font-weight: 600; + color: var(--accent); + margin: 0 0 8px; + } + .login-subtitle { + color: var(--muted); + margin: 0 0 32px; + } + .login-card { + background: var(--card); + border: 1px solid var(--border); + border-radius: 12px; + padding: 32px; + width: 100%; + max-width: 400px; + } + .login-label { + display: block; + font-size: 0.9rem; + font-weight: 500; + margin-bottom: 8px; + } + .login-input { + width: 100%; + padding: 12px 16px; + background: var(--bg); + border: 1px solid var(--border); + border-radius: 8px; + color: var(--text); + font-size: 1rem; + margin-bottom: 8px; + } + .login-input:focus { + outline: none; + border-color: var(--accent); + } + .login-input::placeholder { + color: var(--muted); + } + .login-hint { + font-size: 0.8rem; + color: var(--muted); + margin: 0 0 24px; + } + .login-btn { + width: 100%; + padding: 12px 24px; + background: linear-gradient(135deg, var(--accent), #6366f1); + border: none; + border-radius: 8px; + color: white; + font-size: 1rem; + font-weight: 500; + cursor: pointer; + transition: opacity 0.2s; + } + .login-btn:hover { + opacity: 0.9; + } + .login-btn:disabled { + opacity: 0.5; + cursor: not-allowed; + } + .login-error { + background: rgba(248, 81, 73, 0.1); + border: 1px solid var(--error); + border-radius: 8px; + padding: 12px 16px; + color: var(--error); + font-size: 0.9rem; + margin-bottom: 16px; + display: none; + } + .login-footer { + margin-top: 24px; + font-size: 0.85rem; + color: var(--muted); + text-align: center; + } + + /* Monitor Screen */ + #monitor-screen { + display: none; + padding: 20px; } header { display: flex; @@ -36,6 +149,11 @@ border-bottom: 1px solid var(--border); } h1 { margin: 0; font-size: 1.5rem; } + .header-right { + display: flex; + align-items: center; + gap: 16px; + } #status { padding: 4px 12px; border-radius: 20px; @@ -45,6 +163,20 @@ #status.connected { background: var(--store); } #status.disconnected { background: var(--error); } #status.connecting { background: var(--consolidation); } + .disconnect-btn { + padding: 6px 12px; + background: transparent; + border: 1px solid var(--border); + border-radius: 6px; + color: var(--muted); + font-size: 0.85rem; + cursor: pointer; + transition: all 0.2s; + } + .disconnect-btn:hover { + border-color: var(--error); + color: var(--error); + } #events { display: flex; flex-direction: column; gap: 8px; } @@ -177,50 +309,171 @@ -
-
-

AutoMem Monitor

-
- Events: 0 - Reconnects: 0 -
+ +
+ +

AutoMem Monitor

+ + + -
Connecting...
-
- -
-
- - - -

Waiting for events...

+ + +
+ + +
+
+
+

AutoMem Monitor

+
+ Events: 0 + Reconnects: 0 +
+
+
+
Connecting...
+ +
+
+ +
+
+ + + +

Waiting for events...

+
From e7661401f4b36e2cc5a25c45da4ea58f3e0a504c Mon Sep 17 00:00:00 2001 From: Jack Arturo Date: Sat, 10 Jan 2026 20:41:50 +0100 Subject: [PATCH 18/25] fix(monitor): Use api_key param instead of token for SSE auth Server's _extract_api_token() only accepts api_key query param, not token. This fixes 401 errors when connecting to /stream. Co-Authored-By: Claude Opus 4.5 --- static/monitor.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/static/monitor.html b/static/monitor.html index 6306c6d..6f3c0db 100644 --- a/static/monitor.html +++ b/static/monitor.html @@ -436,7 +436,7 @@

AutoMem Monitor

currentToken = token; const baseUrl = window.location.origin; - eventSource = new EventSource(`${baseUrl}/stream?token=${token}`); + eventSource = new EventSource(`${baseUrl}/stream?api_key=${token}`); // Track if we've successfully connected at least once let hasConnected = false; From 78d002ca4bc707a4a1484f6271b2848fb446e4fc Mon Sep 17 00:00:00 2001 From: Jack Arturo Date: Sat, 10 Jan 2026 21:43:30 +0100 Subject: [PATCH 19/25] feat(monitor): Unified operations dashboard with status cards and inline data - Add status header cards (Health, Memories, Enrichment, Consolidation) - Show expanded event cards with inline tags, entities, links with scores - Add performance badges (green/yellow/red) based on configurable thresholds - Add filter controls (All/Store/Recall/Enrich/Consolidation/Errors) - Add rolling stats (event count, error count, avg latency) - Auto-refresh status endpoints every 30 seconds - Quality badges for warnings (large content, low confidence, no tags) Co-Authored-By: Claude Opus 4.5 --- static/monitor.html | 908 ++++++++++++++++++++++++++++---------------- 1 file changed, 586 insertions(+), 322 deletions(-) diff --git a/static/monitor.html b/static/monitor.html index 6f3c0db..1420434 100644 --- a/static/monitor.html +++ b/static/monitor.html @@ -19,14 +19,13 @@ --consolidation: #f0883e; --error: #f85149; --associate: #3fb950; + --warning: #d29922; } - * { box-sizing: border-box; } + * { box-sizing: border-box; margin: 0; padding: 0; } body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Helvetica, Arial, sans-serif; background: var(--bg); color: var(--text); - margin: 0; - padding: 0; line-height: 1.5; min-height: 100vh; } @@ -53,16 +52,8 @@ color: white; margin-bottom: 24px; } - .login-title { - font-size: 1.75rem; - font-weight: 600; - color: var(--accent); - margin: 0 0 8px; - } - .login-subtitle { - color: var(--muted); - margin: 0 0 32px; - } + .login-title { font-size: 1.75rem; font-weight: 600; color: var(--accent); margin-bottom: 8px; } + .login-subtitle { color: var(--muted); margin-bottom: 32px; } .login-card { background: var(--card); border: 1px solid var(--border); @@ -71,12 +62,7 @@ width: 100%; max-width: 400px; } - .login-label { - display: block; - font-size: 0.9rem; - font-weight: 500; - margin-bottom: 8px; - } + .login-label { display: block; font-size: 0.9rem; font-weight: 500; margin-bottom: 8px; } .login-input { width: 100%; padding: 12px 16px; @@ -87,18 +73,8 @@ font-size: 1rem; margin-bottom: 8px; } - .login-input:focus { - outline: none; - border-color: var(--accent); - } - .login-input::placeholder { - color: var(--muted); - } - .login-hint { - font-size: 0.8rem; - color: var(--muted); - margin: 0 0 24px; - } + .login-input:focus { outline: none; border-color: var(--accent); } + .login-hint { font-size: 0.8rem; color: var(--muted); margin-bottom: 24px; } .login-btn { width: 100%; padding: 12px 24px; @@ -109,106 +85,183 @@ font-size: 1rem; font-weight: 500; cursor: pointer; - transition: opacity 0.2s; - } - .login-btn:hover { - opacity: 0.9; - } - .login-btn:disabled { - opacity: 0.5; - cursor: not-allowed; } + .login-btn:disabled { opacity: 0.5; cursor: not-allowed; } .login-error { background: rgba(248, 81, 73, 0.1); border: 1px solid var(--error); border-radius: 8px; padding: 12px 16px; color: var(--error); - font-size: 0.9rem; margin-bottom: 16px; display: none; } - .login-footer { - margin-top: 24px; - font-size: 0.85rem; - color: var(--muted); - text-align: center; - } + .login-footer { margin-top: 24px; font-size: 0.85rem; color: var(--muted); } /* Monitor Screen */ - #monitor-screen { - display: none; - padding: 20px; - } - header { + #monitor-screen { display: none; } + + /* Header */ + .header { display: flex; justify-content: space-between; align-items: center; - margin-bottom: 20px; - padding-bottom: 20px; + padding: 16px 20px; border-bottom: 1px solid var(--border); + background: var(--card); } - h1 { margin: 0; font-size: 1.5rem; } - .header-right { - display: flex; - align-items: center; - gap: 16px; - } - #status { + .header h1 { font-size: 1.25rem; font-weight: 600; } + .header-right { display: flex; align-items: center; gap: 12px; } + .status-badge { padding: 4px 12px; border-radius: 20px; - font-size: 0.85rem; + font-size: 0.8rem; font-weight: 500; } - #status.connected { background: var(--store); } - #status.disconnected { background: var(--error); } - #status.connecting { background: var(--consolidation); } + .status-badge.connected { background: var(--store); color: white; } + .status-badge.disconnected { background: var(--error); color: white; } + .status-badge.connecting { background: var(--consolidation); color: white; } .disconnect-btn { padding: 6px 12px; background: transparent; border: 1px solid var(--border); border-radius: 6px; color: var(--muted); + font-size: 0.8rem; + cursor: pointer; + } + .disconnect-btn:hover { border-color: var(--error); color: var(--error); } + + /* Status Cards */ + .status-cards { + display: grid; + grid-template-columns: repeat(auto-fit, minmax(160px, 1fr)); + gap: 12px; + padding: 16px 20px; + background: var(--bg); + border-bottom: 1px solid var(--border); + } + .status-card { + background: var(--card); + border: 1px solid var(--border); + border-radius: 8px; + padding: 12px 16px; + } + .card-title { + font-size: 0.7rem; + font-weight: 600; + text-transform: uppercase; + color: var(--muted); + margin-bottom: 8px; + letter-spacing: 0.5px; + } + .card-row { + display: flex; + align-items: center; + gap: 8px; font-size: 0.85rem; + margin-bottom: 4px; + } + .card-row:last-child { margin-bottom: 0; } + .card-value { font-weight: 600; font-size: 1.25rem; } + .card-label { color: var(--muted); font-size: 0.8rem; } + .status-dot { + width: 8px; + height: 8px; + border-radius: 50%; + flex-shrink: 0; + } + .status-dot.green { background: var(--store); } + .status-dot.yellow { background: var(--warning); } + .status-dot.red { background: var(--error); } + .status-dot.gray { background: var(--muted); } + + /* Stats Bar */ + .stats-bar { + display: flex; + align-items: center; + gap: 24px; + padding: 12px 20px; + background: var(--card); + border-bottom: 1px solid var(--border); + font-size: 0.85rem; + } + .stat { color: var(--muted); } + .stat-value { color: var(--text); font-weight: 600; } + .stat-value.error { color: var(--error); } + + /* Filter Bar */ + .filter-bar { + display: flex; + gap: 8px; + padding: 12px 20px; + border-bottom: 1px solid var(--border); + flex-wrap: wrap; + } + .filter-btn { + padding: 6px 14px; + border-radius: 6px; + background: transparent; + border: 1px solid var(--border); + color: var(--muted); + font-size: 0.8rem; cursor: pointer; transition: all 0.2s; } - .disconnect-btn:hover { - border-color: var(--error); - color: var(--error); + .filter-btn:hover { border-color: var(--accent); color: var(--text); } + .filter-btn.active { background: var(--accent); border-color: var(--accent); color: white; } + .filter-btn .badge { + background: var(--error); + color: white; + padding: 1px 6px; + border-radius: 10px; + font-size: 0.7rem; + margin-left: 4px; } - #events { display: flex; flex-direction: column; gap: 8px; } + /* Events Container */ + .events-container { padding: 16px 20px; } - .event { + /* Event Cards */ + .event-card { background: var(--card); border: 1px solid var(--border); - border-radius: 6px; + border-radius: 8px; + margin-bottom: 10px; overflow: hidden; + transition: opacity 0.3s, transform 0.3s; + animation: fadeIn 0.3s ease; + } + @keyframes fadeIn { + from { opacity: 0; transform: translateY(-10px); } + to { opacity: 1; transform: translateY(0); } } + .event-card.error { border-color: var(--error); background: rgba(248, 81, 73, 0.05); } + .event-card.hidden { display: none; } + .event-header { display: flex; align-items: center; gap: 12px; padding: 12px 16px; cursor: pointer; - user-select: none; } - .event-header:hover { background: rgba(255,255,255,0.03); } + .event-header:hover { background: rgba(255,255,255,0.02); } .event-type { - font-size: 0.75rem; + font-size: 0.7rem; font-weight: 600; text-transform: uppercase; - padding: 2px 8px; + padding: 3px 8px; border-radius: 4px; - color: #fff; + color: white; + flex-shrink: 0; } .type-store { background: var(--store); } .type-recall { background: var(--recall); } - .type-enrichment { background: var(--enrichment); } - .type-consolidation { background: var(--consolidation); } - .type-error { background: var(--error); } + .type-enrichment, .type-complete, .type-start { background: var(--enrichment); } + .type-consolidation, .type-run { background: var(--consolidation); } + .type-failed, .type-error { background: var(--error); } .type-associate { background: var(--associate); } .event-summary { @@ -219,53 +272,93 @@ overflow: hidden; text-overflow: ellipsis; } + + .perf-badge { + padding: 2px 8px; + border-radius: 4px; + font-size: 0.75rem; + font-family: monospace; + flex-shrink: 0; + } + .perf-good { color: var(--store); } + .perf-warn { background: rgba(210, 153, 34, 0.2); color: var(--warning); } + .perf-slow { background: rgba(248, 81, 73, 0.2); color: var(--error); } + .event-time { - font-size: 0.8rem; + font-size: 0.75rem; color: var(--muted); font-family: monospace; + flex-shrink: 0; } + .event-toggle { color: var(--muted); + font-size: 0.8rem; transition: transform 0.2s; } - .event.open .event-toggle { transform: rotate(90deg); } + .event-card.open .event-toggle { transform: rotate(90deg); } - .event-details { - display: none; - padding: 0 16px 16px; + /* Event Body - Inline Data */ + .event-body { + padding: 0 16px 14px; + font-size: 0.85rem; border-top: 1px solid var(--border); } - .event.open .event-details { display: block; } + .event-card:not(.open) .event-body { display: none; } - .detail-grid { - display: grid; - grid-template-columns: 120px 1fr; - gap: 8px 16px; - font-size: 0.85rem; - } - .detail-label { - color: var(--muted); - font-weight: 500; - } - .detail-value { - color: var(--text); - word-break: break-word; + .event-row { + display: flex; + flex-wrap: wrap; + gap: 16px; + margin-top: 10px; } - .detail-value.mono { - font-family: monospace; - font-size: 0.8rem; + .event-field { + display: flex; + gap: 6px; } + .field-label { color: var(--muted); } + .field-value { color: var(--text); } - .tags { + .tags-list { display: flex; flex-wrap: wrap; gap: 4px; + margin-top: 8px; } .tag { - background: rgba(255,255,255,0.1); + background: rgba(255,255,255,0.08); padding: 2px 8px; border-radius: 4px; font-size: 0.75rem; + color: var(--text); + } + .tag.added { background: rgba(35, 134, 54, 0.3); color: var(--store); } + .tag.entity { background: rgba(163, 113, 247, 0.2); color: var(--accent); } + + .links-section { margin-top: 10px; } + .link-row { + display: flex; + gap: 8px; + margin-top: 4px; + font-size: 0.8rem; + } + .link-type { color: var(--muted); min-width: 70px; } + .link-value { color: var(--text); font-family: monospace; } + .link-score { color: var(--accent); } + + .error-message { + color: var(--error); + margin-top: 8px; + padding: 8px 12px; + background: rgba(248, 81, 73, 0.1); + border-radius: 4px; + } + + .content-preview { + margin-top: 8px; + color: var(--text); + font-size: 0.85rem; + line-height: 1.4; } .json-toggle { @@ -276,7 +369,7 @@ .json-toggle summary { cursor: pointer; color: var(--muted); - font-size: 0.8rem; + font-size: 0.75rem; } .json-toggle pre { margin: 8px 0 0; @@ -284,28 +377,38 @@ background: var(--bg); border-radius: 4px; overflow-x: auto; - font-size: 0.75rem; + font-size: 0.7rem; } - #empty { + /* Empty State */ + .empty-state { text-align: center; padding: 60px 20px; color: var(--muted); } - #empty svg { + .empty-state svg { width: 48px; height: 48px; margin-bottom: 16px; opacity: 0.5; } - .stats { - display: flex; - gap: 20px; - font-size: 0.85rem; - color: var(--muted); + /* Quality Badges */ + .quality-badge { + font-size: 0.7rem; + padding: 2px 6px; + border-radius: 4px; + margin-left: 8px; + } + .quality-badge.warn { background: rgba(210, 153, 34, 0.2); color: var(--warning); } + .quality-badge.large { background: rgba(248, 81, 73, 0.2); color: var(--error); } + .quality-badge.low-conf { background: rgba(139, 148, 158, 0.2); color: var(--muted); } + + /* Responsive */ + @media (max-width: 768px) { + .status-cards { grid-template-columns: 1fr 1fr; } + .stats-bar { flex-wrap: wrap; gap: 12px; } } - .stat-value { color: var(--text); font-weight: 600; } @@ -313,8 +416,7 @@

AutoMem Monitor

- - + -
-
-
-

AutoMem Monitor

-
- Events: 0 - Reconnects: 0 -
-
+
+

AutoMem Monitor

-
Connecting...
+ Connecting...
-
-
+ +
+
+
Health
+
FalkorDB
+
Qdrant
+
Checking...
+
+
+
Memories
+
+
total memories
+
Vectors
+
+
+
Enrichment
+
Queue:
+
Failed:
+
Pending:
+
+
+
Consolidation
+
Decay:
+
Creative:
+
Next:
+
+
+ + +
+ Events: 0 + Errors: 0 + Avg Latency: + Reconnects: 0 +
+ + +
+ + + + + + +
+ + +
+
@@ -355,11 +497,23 @@

AutoMem Monitor

From 6d5d8bed2aa923d96d208883849a740344443e11 Mon Sep 17 00:00:00 2001 From: Jack Arturo Date: Sun, 11 Jan 2026 02:05:15 +0100 Subject: [PATCH 20/25] feat: Add CORS support for cross-origin clients - Add flask-cors 4.0.0 dependency - Enable CORS globally for Graph Viewer and other cross-origin clients Co-Authored-By: Claude Opus 4.5 --- app.py | 2 ++ requirements.txt | 1 + 2 files changed, 3 insertions(+) diff --git a/app.py b/app.py index 440ae9e..bc9531c 100644 --- a/app.py +++ b/app.py @@ -28,6 +28,7 @@ from falkordb import FalkorDB from flask import Blueprint, Flask, abort, jsonify, request +from flask_cors import CORS from qdrant_client import QdrantClient from qdrant_client import models as qdrant_models @@ -111,6 +112,7 @@ def __init__(self, id: str, vector: List[float], payload: Dict[str, Any]): sys.path.insert(0, str(root)) app = Flask(__name__, static_folder="static") +CORS(app) # Enable CORS for Graph Viewer and other cross-origin clients # Legacy blueprint placeholders for deprecated route definitions below. # These are not registered with the app and are safe to keep until full removal. diff --git a/requirements.txt b/requirements.txt index 73637e1..e635b73 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ # requirements.txt - Updated versions for 2024/2025 flask==3.0.3 +flask-cors==4.0.0 falkordb==1.0.9 qdrant-client==1.11.3 python-dotenv==1.0.1 From 8eabae64a68cd0a73052eb76cf59eec6283e1403 Mon Sep 17 00:00:00 2001 From: Jack Arturo Date: Sun, 11 Jan 2026 22:32:01 +0100 Subject: [PATCH 21/25] feat(stream): Add optional JSONL event logging with history hydration - Add AUTOMEM_EVENT_LOG env var to enable file-based event logging - Add AUTOMEM_EVENT_LOG_MAX to limit log size (default: 500 events) - Events appended to JSONL file, auto-truncated when limit exceeded - New /stream/history endpoint returns cached events for UI hydration - New /stream/log-status endpoint shows log enabled state and size - Monitor UI fetches history on connect and displays log status - Historical events render without animation Co-Authored-By: Claude Opus 4.5 --- automem/api/stream.py | 120 ++++++++++++++++++++++++++++++++++++++++++ static/monitor.html | 57 ++++++++++++++++++-- 2 files changed, 174 insertions(+), 3 deletions(-) diff --git a/automem/api/stream.py b/automem/api/stream.py index 98f0cb1..a6c6115 100644 --- a/automem/api/stream.py +++ b/automem/api/stream.py @@ -3,11 +3,15 @@ Provides a /stream endpoint that emits events for memory operations, enrichment, and consolidation tasks. Uses an in-memory subscriber pattern with bounded queues per client. + +Optionally logs events to a JSONL file for persistence (env: AUTOMEM_EVENT_LOG). """ from __future__ import annotations import json +import os +from pathlib import Path from queue import Empty, Full, Queue from threading import Lock from typing import Any, Callable, Dict, Generator, List @@ -18,11 +22,50 @@ _subscribers: List[Queue] = [] _subscribers_lock = Lock() +# Event log configuration +_event_log_path = os.getenv("AUTOMEM_EVENT_LOG", "") +_event_log_max = int(os.getenv("AUTOMEM_EVENT_LOG_MAX", "500")) +_event_log_lock = Lock() + + +def _write_event_to_log(event: Dict[str, Any]) -> None: + """Append event to JSONL log file, truncating if needed. + + Thread-safe. Only writes if AUTOMEM_EVENT_LOG is set. + """ + if not _event_log_path: + return + + with _event_log_lock: + path = Path(_event_log_path) + path.parent.mkdir(parents=True, exist_ok=True) + + # Read existing events + events = [] + if path.exists(): + try: + with open(path, "r") as f: + events = [line.strip() for line in f if line.strip()] + except Exception: + events = [] + + # Append new event + events.append(json.dumps(event)) + + # Truncate to max + if len(events) > _event_log_max: + events = events[-_event_log_max:] + + # Write back + with open(path, "w") as f: + f.write("\n".join(events) + "\n") + def emit_event(event_type: str, data: Dict[str, Any], utc_now: Callable[[], str]) -> None: """Emit an event to all SSE subscribers. Thread-safe. Drops events if a subscriber queue is full (slow client). + Also writes to JSONL log file if AUTOMEM_EVENT_LOG is configured. Args: event_type: Event type (e.g., "memory.store", "consolidation.run") @@ -34,6 +77,10 @@ def emit_event(event_type: str, data: Dict[str, Any], utc_now: Callable[[], str] "timestamp": utc_now(), "data": data, } + + # Write to log file if enabled + _write_event_to_log(event) + event_str = f"data: {json.dumps(event)}\n\n" with _subscribers_lock: @@ -50,6 +97,61 @@ def get_subscriber_count() -> int: return len(_subscribers) +def get_event_history(limit: int = 100) -> List[Dict[str, Any]]: + """Return recent events from the log file. + + Args: + limit: Maximum number of events to return + + Returns: + List of event dictionaries, oldest first + """ + if not _event_log_path: + return [] + + path = Path(_event_log_path) + if not path.exists(): + return [] + + with _event_log_lock: + try: + with open(path, "r") as f: + lines = [line.strip() for line in f if line.strip()] + # Return last N events + return [json.loads(line) for line in lines[-limit:]] + except Exception: + return [] + + +def get_log_status() -> Dict[str, Any]: + """Return event log status for display. + + Returns: + Dict with enabled, path, size_bytes, event_count, max_events + """ + enabled = bool(_event_log_path) + size = 0 + count = 0 + + if enabled: + path = Path(_event_log_path) + if path.exists(): + try: + size = path.stat().st_size + with open(path, "r") as f: + count = sum(1 for line in f if line.strip()) + except Exception: + pass + + return { + "enabled": enabled, + "path": _event_log_path or None, + "size_bytes": size, + "event_count": count, + "max_events": _event_log_max, + } + + def create_stream_blueprint( require_api_token: Callable[[], None], ) -> Blueprint: @@ -112,4 +214,22 @@ def stream_status() -> Any: } ) + @bp.route("/stream/history", methods=["GET"]) + def stream_history() -> Any: + """Return cached events from log file for monitor hydration.""" + from flask import jsonify, request + + require_api_token() + limit = request.args.get("limit", 100, type=int) + events = get_event_history(min(limit, 500)) + return jsonify({"events": events, "count": len(events)}) + + @bp.route("/stream/log-status", methods=["GET"]) + def stream_log_status() -> Any: + """Return event log status (enabled, path, size, count).""" + from flask import jsonify + + require_api_token() + return jsonify(get_log_status()) + return bp diff --git a/static/monitor.html b/static/monitor.html index 1420434..4d3ac72 100644 --- a/static/monitor.html +++ b/static/monitor.html @@ -473,6 +473,7 @@

AutoMem Monitor

Errors: 0 Avg Latency: Reconnects: 0 + 📁
@@ -601,13 +602,15 @@

AutoMem Monitor

let hasConnected = false; - eventSource.onopen = () => { + eventSource.onopen = async () => { hasConnected = true; localStorage.setItem('automem_token', token); showMonitorScreen(); statusBadge.textContent = 'Connected'; statusBadge.className = 'status-badge connected'; fetchSystemStatus(); + fetchLogStatus(); + await fetchEventHistory(); // Hydrate from log file statusRefreshInterval = setInterval(fetchSystemStatus, 30000); }; @@ -653,6 +656,53 @@

AutoMem Monitor

} } + async function fetchEventHistory() { + if (!currentToken) return; + const headers = { 'X-API-Key': currentToken }; + + try { + const resp = await fetch('/stream/history?limit=100', { headers }); + if (!resp.ok) return; + const { events } = await resp.json(); + if (events?.length) { + // Add historical events (oldest first, so they appear in correct order) + events.forEach(event => addEvent(event, true)); + console.log(`Loaded ${events.length} historical events`); + } + } catch (err) { + console.log('No event history available'); + } + } + + async function fetchLogStatus() { + if (!currentToken) return; + const headers = { 'X-API-Key': currentToken }; + + try { + const resp = await fetch('/stream/log-status', { headers }); + if (!resp.ok) return; + const status = await resp.json(); + const el = document.getElementById('log-status-text'); + if (status.enabled) { + el.textContent = `${status.event_count} events (${formatBytes(status.size_bytes)})`; + el.style.color = 'var(--text)'; + } else { + el.textContent = 'disabled'; + el.style.color = 'var(--muted)'; + } + } catch (err) { + document.getElementById('log-status-text').textContent = '—'; + } + } + + function formatBytes(bytes) { + if (bytes === 0) return '0 B'; + const k = 1024; + const sizes = ['B', 'KB', 'MB', 'GB']; + const i = Math.floor(Math.log(bytes) / Math.log(k)); + return parseFloat((bytes / Math.pow(k, i)).toFixed(1)) + ' ' + sizes[i]; + } + function updateHealthCard(h) { document.getElementById('falkor-dot').className = 'status-dot ' + (h.falkordb === 'connected' ? 'green' : 'red'); document.getElementById('qdrant-dot').className = 'status-dot ' + (h.qdrant === 'connected' ? 'green' : h.qdrant === 'disconnected' ? 'red' : 'yellow'); @@ -907,7 +957,7 @@

AutoMem Monitor

return html; } - function addEvent(event) { + function addEvent(event, isHistorical = false) { // Remove empty state if (emptyState) emptyState.remove(); @@ -944,8 +994,9 @@

AutoMem Monitor

const perfType = event.type.includes('enrich') ? 'enrichment' : event.type.includes('recall') ? 'recall' : 'store'; const card = document.createElement('div'); - card.className = `event-card${isError ? ' error' : ''}`; + card.className = `event-card${isError ? ' error' : ''}${isHistorical ? ' historical' : ''}`; card.dataset.category = category; + if (isHistorical) card.style.animation = 'none'; // Skip animation for historical events const hidden = currentFilter !== 'all' && currentFilter !== category; if (hidden) card.classList.add('hidden'); From 37ad4791f55fbd370003b497e884e54ed790673e Mon Sep 17 00:00:00 2001 From: Jack Arturo Date: Sun, 11 Jan 2026 23:59:04 +0100 Subject: [PATCH 22/25] docs: Add event logging and streaming endpoints to CLAUDE.md - Document AUTOMEM_EVENT_LOG and AUTOMEM_EVENT_LOG_MAX env vars - Add Streaming (SSE) section with /stream/* endpoints - Add Web UI section with /monitor endpoint Co-Authored-By: Claude Opus 4.5 --- CLAUDE.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/CLAUDE.md b/CLAUDE.md index a89a594..ba0fb1d 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -67,6 +67,15 @@ The API (`app.py`) provides 13 endpoints: ### Health - `GET /health` - Service health check with database connectivity status +### Streaming (SSE) +- `GET /stream` - Server-Sent Events endpoint for real-time event streaming (requires auth) +- `GET /stream/status` - Return current subscriber count +- `GET /stream/history` - Return cached events from log file for monitor hydration +- `GET /stream/log-status` - Return event log status (enabled, path, size, count) + +### Web UI +- `GET /monitor` - Unified operations dashboard (serves static HTML) + ## Architecture ### Data Flow @@ -230,6 +239,10 @@ ENRICHMENT_IDLE_SLEEP_SECONDS=2 # Worker sleep when idle ENRICHMENT_FAILURE_BACKOFF_SECONDS=5 # Backoff between retries ENRICHMENT_ENABLE_SUMMARIES=true # Toggle automatic summary creation ENRICHMENT_SPACY_MODEL=en_core_web_sm # spaCy model for entity extraction + +# Event logging (for /monitor history hydration) +AUTOMEM_EVENT_LOG= # Path to JSONL log file (empty = disabled) +AUTOMEM_EVENT_LOG_MAX=500 # Max events to retain before truncation ``` Install spaCy locally to improve entity extraction: From d1bb238c6318f6280ef9db3577b52099d67b6654 Mon Sep 17 00:00:00 2001 From: Jack Arturo Date: Tue, 13 Jan 2026 13:57:33 +0100 Subject: [PATCH 23/25] revert: Remove auto-fix workflow and CI prompt files Deleted .github/workflows/auto-fix.yml and .github/codex/prompts/fix-ci.md to clean up unused CI automation and prompt configuration files. --- .github/codex/prompts/fix-ci.md | 36 ---------- .github/workflows/auto-fix.yml | 122 -------------------------------- 2 files changed, 158 deletions(-) delete mode 100644 .github/codex/prompts/fix-ci.md delete mode 100644 .github/workflows/auto-fix.yml diff --git a/.github/codex/prompts/fix-ci.md b/.github/codex/prompts/fix-ci.md deleted file mode 100644 index 61ed549..0000000 --- a/.github/codex/prompts/fix-ci.md +++ /dev/null @@ -1,36 +0,0 @@ -# Fix CI Failures - AutoMem/AutoHub - -You are fixing CI failures for the AutoMem/AutoHub project. This is a Python Flask application with: -- Flask + SSE (Server-Sent Events) -- FalkorDB (graph database) -- Qdrant (vector database) -- Docker/Docker Compose deployment -- pytest for testing - -## Your Task - -Analyze the test failures and CodeRabbit comments below, then make the **minimal changes** needed to fix them. - -## Rules - -1. **Be surgical** - Only change what's necessary to fix the specific issue -2. **Don't refactor** - Resist the urge to "improve" adjacent code -3. **Match existing style** - Follow the patterns already in the codebase -4. **Preserve API** - Don't change endpoints or response formats unless the fix requires it -5. **Keep tests passing** - Your fix should not break other tests - -## Common Fixes - -- **Import errors**: Check relative imports, ensure modules exist -- **Type hints**: Add proper typing for function signatures -- **Async issues**: Ensure proper async/await usage -- **Database errors**: Check connection handling and query syntax -- **API errors**: Validate request/response handling - -## What NOT to Do - -- Don't add new features -- Don't refactor working code -- Don't change Docker configuration unless required -- Don't modify environment variable handling unnecessarily -- Don't add excessive logging or comments diff --git a/.github/workflows/auto-fix.yml b/.github/workflows/auto-fix.yml deleted file mode 100644 index 9d38e75..0000000 --- a/.github/workflows/auto-fix.yml +++ /dev/null @@ -1,122 +0,0 @@ -name: Auto-Fix with Codex - -on: - workflow_run: - workflows: ["CI"] - types: [completed] - workflow_dispatch: - inputs: - prompt: - description: 'Custom instructions (optional)' - required: false - type: string - branch: - description: 'Branch to fix (defaults to current)' - required: false - type: string - -jobs: - auto-fix: - if: | - github.event_name == 'workflow_dispatch' || - github.event.workflow_run.conclusion == 'failure' - runs-on: ubuntu-latest - permissions: - contents: write - pull-requests: write - actions: read - - steps: - - name: Determine branch - id: branch - run: | - if [ "${{ github.event_name }}" == "workflow_dispatch" ]; then - BRANCH="${{ inputs.branch || github.ref_name }}" - else - BRANCH="${{ github.event.workflow_run.head_branch }}" - fi - echo "branch=$BRANCH" >> $GITHUB_OUTPUT - echo "🎯 Target branch: $BRANCH" - - - uses: actions/checkout@v4 - with: - ref: ${{ steps.branch.outputs.branch }} - fetch-depth: 0 - - - name: Setup Codex Auth - run: | - mkdir -p ~/.codex - echo '${{ secrets.CODEX_AUTH_JSON }}' > ~/.codex/auth.json - - - name: Gather failure context - id: context - if: github.event_name == 'workflow_run' - run: | - echo "📋 Fetching CI failure logs..." - gh run view ${{ github.event.workflow_run.id }} --log-failed > /tmp/ci-failures.log 2>&1 || true - - echo "📋 Fetching CodeRabbit comments..." - PR_NUMBER=$(gh pr list --head "${{ steps.branch.outputs.branch }}" --json number -q '.[0].number' 2>/dev/null || echo "") - if [ -n "$PR_NUMBER" ]; then - gh pr view "$PR_NUMBER" --json comments -q '.comments[] | select(.author.login == "coderabbitai") | .body' > /tmp/coderabbit.txt 2>&1 || true - fi - - echo "# CI Failure Context" > /tmp/context.md - if [ -s /tmp/ci-failures.log ]; then - echo "## Test Failures" >> /tmp/context.md - echo '```' >> /tmp/context.md - tail -200 /tmp/ci-failures.log >> /tmp/context.md - echo '```' >> /tmp/context.md - fi - if [ -s /tmp/coderabbit.txt ]; then - echo "## CodeRabbit Comments" >> /tmp/context.md - cat /tmp/coderabbit.txt >> /tmp/context.md - fi - env: - GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - - name: Prepare prompt - run: | - if [ -n "${{ inputs.prompt }}" ]; then - echo "${{ inputs.prompt }}" > /tmp/prompt.md - elif [ -f ".github/codex/prompts/fix-ci.md" ]; then - cat .github/codex/prompts/fix-ci.md > /tmp/prompt.md - else - cat > /tmp/prompt.md << 'PROMPT' - Fix the CI failures in this repository. - Make minimal, surgical changes only. - PROMPT - fi - - if [ -f /tmp/context.md ]; then - echo "" >> /tmp/prompt.md - echo "---" >> /tmp/prompt.md - cat /tmp/context.md >> /tmp/prompt.md - fi - - - name: Run Codex - uses: openai/codex-action@v1 - with: - prompt-file: /tmp/prompt.md - codex-args: '["--full-auto"]' - sandbox: workspace-write - safety-strategy: drop-sudo - - - name: Check for changes - id: changes - run: | - if git diff --quiet; then - echo "has_changes=false" >> $GITHUB_OUTPUT - else - echo "has_changes=true" >> $GITHUB_OUTPUT - git diff --stat - fi - - - name: Commit and push - if: steps.changes.outputs.has_changes == 'true' - run: | - git config user.name "codex[bot]" - git config user.email "codex[bot]@users.noreply.github.com" - git add . - git commit -m "fix: auto-fix CI failures [skip ci]" - git push From 151305ee2a22787c1974b3ff7b727e30336877a5 Mon Sep 17 00:00:00 2001 From: Jack Arturo Date: Tue, 13 Jan 2026 14:18:06 +0100 Subject: [PATCH 24/25] fix: address CodeRabbit review feedback --- app.py | 30 ++- automem/api/stream.py | 83 ++++--- requirements.txt | 2 +- scripts/automem_watch.py | 2 +- static/monitor.html | 489 +++++++++++++++++++++++---------------- 5 files changed, 376 insertions(+), 230 deletions(-) diff --git a/app.py b/app.py index bc9531c..67df14b 100644 --- a/app.py +++ b/app.py @@ -1030,9 +1030,12 @@ def extract_entities(content: str) -> Dict[str, List[str]]: class EnrichmentStats: processed_total: int = 0 successes: int = 0 + skips: int = 0 failures: int = 0 last_success_id: Optional[str] = None last_success_at: Optional[str] = None + last_skip_id: Optional[str] = None + last_skip_at: Optional[str] = None last_error: Optional[str] = None last_error_at: Optional[str] = None @@ -1042,6 +1045,12 @@ def record_success(self, memory_id: str) -> None: self.last_success_id = memory_id self.last_success_at = utc_now() + def record_skip(self, memory_id: str) -> None: + self.processed_total += 1 + self.skips += 1 + self.last_skip_id = memory_id + self.last_skip_at = utc_now() + def record_failure(self, error: str) -> None: self.processed_total += 1 self.failures += 1 @@ -1052,9 +1061,12 @@ def to_dict(self) -> Dict[str, Any]: return { "processed_total": self.processed_total, "successes": self.successes, + "skips": self.skips, "failures": self.failures, "last_success_id": self.last_success_id, "last_success_at": self.last_success_at, + "last_skip_id": self.last_skip_id, + "last_skip_at": self.last_skip_at, "last_error": self.last_error, "last_error_at": self.last_error_at, } @@ -1146,6 +1158,8 @@ def require_api_token() -> None: # Allow unauthenticated health checks and monitor page endpoint = request.endpoint or "" + if endpoint == "static" or request.path.startswith("/static/"): + return if endpoint.endswith("health") or request.path in ("/health", "/monitor"): return @@ -1768,7 +1782,10 @@ def enrichment_worker() -> None: try: result = enrich_memory(job.memory_id, forced=job.forced) - state.enrichment_stats.record_success(job.memory_id) + if result.get("processed", False): + state.enrichment_stats.record_success(job.memory_id) + else: + state.enrichment_stats.record_skip(job.memory_id) elapsed_ms = int((time.perf_counter() - enrich_start) * 1000) emit_event( @@ -3035,17 +3052,22 @@ def recall_memories() -> Any: # Build result summaries for top 10 results for r in results[:10]: - mem = r.get("memory", {}) + mem = r.get("memory") or {} + if not isinstance(mem, dict): + mem = {} + tags_list = mem.get("tags") or [] result_summaries.append( { "id": str(r.get("id", ""))[:8], "score": round(float(r.get("final_score", 0)), 3), "type": mem.get("type", "?"), "content_length": len(mem.get("content", "")), - "tags_count": len(mem.get("tags", [])), + "tags_count": ( + len(tags_list) if isinstance(tags_list, (list, tuple)) else 0 + ), } ) - except Exception as e: + except (TypeError, ValueError, AttributeError) as e: logger.debug("Failed to parse response for result_count", exc_info=e) # Compute aggregate stats diff --git a/automem/api/stream.py b/automem/api/stream.py index a6c6115..547f3db 100644 --- a/automem/api/stream.py +++ b/automem/api/stream.py @@ -10,7 +10,9 @@ from __future__ import annotations import json +import logging import os +from collections import deque from pathlib import Path from queue import Empty, Full, Queue from threading import Lock @@ -26,6 +28,25 @@ _event_log_path = os.getenv("AUTOMEM_EVENT_LOG", "") _event_log_max = int(os.getenv("AUTOMEM_EVENT_LOG_MAX", "500")) _event_log_lock = Lock() +_logger = logging.getLogger(__name__) + + +def _get_event_log_path() -> Path | None: + if not _event_log_path: + return None + + raw_path = Path(_event_log_path) + if not raw_path.is_absolute(): + if ".." in raw_path.parts: + _logger.warning("Refusing AUTOMEM_EVENT_LOG path traversal: %s", _event_log_path) + return None + raw_path = Path.cwd() / raw_path + + try: + return raw_path.resolve(strict=False) + except (OSError, RuntimeError, ValueError) as exc: + _logger.warning("Invalid AUTOMEM_EVENT_LOG path %s: %s", _event_log_path, exc) + return None def _write_event_to_log(event: Dict[str, Any]) -> None: @@ -33,32 +54,36 @@ def _write_event_to_log(event: Dict[str, Any]) -> None: Thread-safe. Only writes if AUTOMEM_EVENT_LOG is set. """ - if not _event_log_path: + path = _get_event_log_path() + if path is None: return with _event_log_lock: - path = Path(_event_log_path) - path.parent.mkdir(parents=True, exist_ok=True) + try: + path.parent.mkdir(parents=True, exist_ok=True) + except OSError as exc: + _logger.warning("Failed to create event log directory %s: %s", path.parent, exc) + return - # Read existing events - events = [] + events: deque[str] = deque(maxlen=max(_event_log_max, 1)) if path.exists(): try: with open(path, "r") as f: - events = [line.strip() for line in f if line.strip()] - except Exception: - events = [] + for line in f: + stripped = line.strip() + if stripped: + events.append(stripped) + except OSError as exc: + _logger.warning("Failed to read event log %s: %s", path, exc) # Append new event events.append(json.dumps(event)) - # Truncate to max - if len(events) > _event_log_max: - events = events[-_event_log_max:] - - # Write back - with open(path, "w") as f: - f.write("\n".join(events) + "\n") + try: + with open(path, "w") as f: + f.write("\n".join(events) + "\n") + except OSError as exc: + _logger.warning("Failed to write event log %s: %s", path, exc) def emit_event(event_type: str, data: Dict[str, Any], utc_now: Callable[[], str]) -> None: @@ -106,10 +131,10 @@ def get_event_history(limit: int = 100) -> List[Dict[str, Any]]: Returns: List of event dictionaries, oldest first """ - if not _event_log_path: + path = _get_event_log_path() + if path is None: return [] - path = Path(_event_log_path) if not path.exists(): return [] @@ -119,7 +144,8 @@ def get_event_history(limit: int = 100) -> List[Dict[str, Any]]: lines = [line.strip() for line in f if line.strip()] # Return last N events return [json.loads(line) for line in lines[-limit:]] - except Exception: + except (OSError, json.JSONDecodeError, TypeError, ValueError) as exc: + _logger.warning("Failed to load event history from %s: %s", path, exc) return [] @@ -129,23 +155,22 @@ def get_log_status() -> Dict[str, Any]: Returns: Dict with enabled, path, size_bytes, event_count, max_events """ - enabled = bool(_event_log_path) + path = _get_event_log_path() + enabled = bool(path) size = 0 count = 0 - if enabled: - path = Path(_event_log_path) - if path.exists(): - try: - size = path.stat().st_size - with open(path, "r") as f: - count = sum(1 for line in f if line.strip()) - except Exception: - pass + if path and path.exists(): + try: + size = path.stat().st_size + with open(path, "r") as f: + count = sum(1 for line in f if line.strip()) + except OSError: + pass return { "enabled": enabled, - "path": _event_log_path or None, + "path": str(path) if path else None, "size_bytes": size, "event_count": count, "max_events": _event_log_max, diff --git a/requirements.txt b/requirements.txt index e635b73..8aeb6d1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ # requirements.txt - Updated versions for 2024/2025 flask==3.0.3 -flask-cors==4.0.0 +flask-cors==6.0.0 falkordb==1.0.9 qdrant-client==1.11.3 python-dotenv==1.0.1 diff --git a/scripts/automem_watch.py b/scripts/automem_watch.py index e48be2e..a278d36 100755 --- a/scripts/automem_watch.py +++ b/scripts/automem_watch.py @@ -430,7 +430,7 @@ def stream_events(url: str, token: str, max_reconnects: int = 0) -> None: if is_ssl_error: consecutive_ssl_errors += 1 # Exponential backoff for SSL errors: 2s, 4s, 8s, 16s, max 30s - backoff = min(30, 2 ** min(consecutive_ssl_errors, 4)) + backoff = min(30, 2**consecutive_ssl_errors) console.print( f"[yellow]SSL connection reset[/] [dim](Railway LB, backoff {backoff}s)[/]" ) diff --git a/static/monitor.html b/static/monitor.html index 4d3ac72..b94856d 100644 --- a/static/monitor.html +++ b/static/monitor.html @@ -505,16 +505,70 @@

AutoMem Monitor

enrichment: { warn: 2000, slow: 5000 }, contentSize: { warn: 1000, large: 2000 }, typeConfidence: { low: 0.5 } - }; - - // State - let eventSource = null; - let currentToken = null; - let eventHistory = []; - let errorCount = 0; - let reconnectCount = 0; - let currentFilter = 'all'; - let statusRefreshInterval = null; + }; + + // State + let sseAbortController = null; + let currentToken = null; + let eventHistory = []; + let errorCount = 0; + let reconnectCount = 0; + let currentFilter = 'all'; + let statusRefreshInterval = null; + + function escapeHtml(text) { + const div = document.createElement('div'); + div.textContent = text == null ? '' : String(text); + return div.innerHTML; + } + + function sleep(ms) { + return new Promise(resolve => setTimeout(resolve, ms)); + } + + async function readSseStream(stream, onMessage, signal) { + const reader = stream.getReader(); + const decoder = new TextDecoder('utf-8'); + let buffer = ''; + let eventName = ''; + let dataBuffer = ''; + + while (true) { + if (signal?.aborted) { + try { reader.cancel(); } catch (_) {} + return; + } + const { value, done } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split(/\r?\n/); + buffer = lines.pop() || ''; + + for (const line of lines) { + if (line === '') { + if (dataBuffer) { + const payload = dataBuffer.endsWith('\n') ? dataBuffer.slice(0, -1) : dataBuffer; + onMessage(payload, eventName || 'message'); + } + eventName = ''; + dataBuffer = ''; + continue; + } + if (line.startsWith(':')) { + continue; + } + if (line.startsWith('event:')) { + eventName = line.slice(6).trim(); + continue; + } + if (line.startsWith('data:')) { + dataBuffer += line.slice(5).replace(/^\s*/, '') + '\n'; + continue; + } + } + } + } // Elements const loginScreen = document.getElementById('login-screen'); @@ -578,64 +632,109 @@

AutoMem Monitor

loginError.style.display = 'block'; } - function disconnect() { - if (eventSource) { - eventSource.close(); - eventSource = null; - } - if (statusRefreshInterval) { - clearInterval(statusRefreshInterval); - statusRefreshInterval = null; - } - localStorage.removeItem('automem_token'); + function disconnect() { + if (sseAbortController) { + sseAbortController.abort(); + sseAbortController = null; + } + if (statusRefreshInterval) { + clearInterval(statusRefreshInterval); + statusRefreshInterval = null; + } + localStorage.removeItem('automem_token'); currentToken = null; } - function tryConnect(token) { - connectBtn.disabled = true; - connectBtn.textContent = 'Connecting...'; - loginError.style.display = 'none'; - currentToken = token; - - const baseUrl = window.location.origin; - eventSource = new EventSource(`${baseUrl}/stream?api_key=${token}`); - - let hasConnected = false; - - eventSource.onopen = async () => { - hasConnected = true; - localStorage.setItem('automem_token', token); - showMonitorScreen(); - statusBadge.textContent = 'Connected'; - statusBadge.className = 'status-badge connected'; - fetchSystemStatus(); - fetchLogStatus(); - await fetchEventHistory(); // Hydrate from log file - statusRefreshInterval = setInterval(fetchSystemStatus, 30000); - }; - - eventSource.onmessage = (e) => { - try { - const event = JSON.parse(e.data); - addEvent(event); - } catch (err) { - console.error('Failed to parse event:', e.data); - } - }; - - eventSource.onerror = () => { - if (!hasConnected) { - eventSource.close(); - eventSource = null; - showLoginScreen('Invalid token or connection failed'); - } else { - statusBadge.textContent = 'Reconnecting...'; - statusBadge.className = 'status-badge disconnected'; - reconnectCount++; - document.getElementById('reconnect-count').textContent = reconnectCount; - } - }; - } + function tryConnect(token) { + connectBtn.disabled = true; + connectBtn.textContent = 'Connecting...'; + loginError.style.display = 'none'; + currentToken = token; + + const baseUrl = window.location.origin; + let hasConnected = false; + let backoffMs = 1000; + + if (sseAbortController) { + sseAbortController.abort(); + sseAbortController = null; + } + const controller = new AbortController(); + sseAbortController = controller; + + const onOpen = async (isReconnect) => { + if (!isReconnect) { + localStorage.setItem('automem_token', token); + showMonitorScreen(); + } + statusBadge.textContent = 'Connected'; + statusBadge.className = 'status-badge connected'; + fetchSystemStatus(); + fetchLogStatus(); + if (!statusRefreshInterval) { + statusRefreshInterval = setInterval(fetchSystemStatus, 30000); + } + if (!isReconnect) { + await fetchEventHistory(); // Hydrate from log file + } + }; + + const onMessage = (payload) => { + try { + const event = JSON.parse(payload); + addEvent(event); + } catch (err) { + console.error('Failed to parse event:', payload); + } + }; + + const connectOnce = async (isReconnect) => { + const headers = { + 'Accept': 'text/event-stream', + 'Authorization': `Bearer ${token}`, + }; + const resp = await fetch(`${baseUrl}/stream`, { + headers, + signal: controller.signal, + cache: 'no-store', + }); + if (!resp.ok) { + throw new Error(`HTTP ${resp.status}`); + } + if (!hasConnected) { + hasConnected = true; + } + await onOpen(isReconnect); + await readSseStream(resp.body, onMessage, controller.signal); + }; + + (async () => { + while (!controller.signal.aborted && currentToken === token) { + try { + await connectOnce(hasConnected); + if (controller.signal.aborted) return; + throw new Error('Stream ended'); + } catch (err) { + if (controller.signal.aborted) return; + if (!hasConnected) { + showLoginScreen('Invalid token or connection failed'); + connectBtn.disabled = false; + connectBtn.textContent = 'Connect →'; + currentToken = null; + sseAbortController = null; + return; + } + statusBadge.textContent = 'Reconnecting...'; + statusBadge.className = 'status-badge disconnected'; + reconnectCount++; + document.getElementById('reconnect-count').textContent = reconnectCount; + await sleep(backoffMs); + backoffMs = Math.min(backoffMs * 2, 30000); + } + } + })(); + + } async function fetchSystemStatus() { if (!currentToken) return; @@ -815,55 +914,55 @@

AutoMem Monitor

return badges; } - function getSummary(event) { - const d = event.data || {}; - const type = event.type; - - switch (type) { - case 'memory.store': - return truncate(d.content, 60) || d.memory_id?.slice(0, 8); - case 'memory.recall': - return `"${truncate(d.query, 40)}" → ${d.result_count || 0} results`; - case 'memory.associate': - return `${d.memory1_id?.slice(0,8)} → ${d.memory2_id?.slice(0,8)} (${d.relation_type})`; - case 'enrichment.start': - return `${d.memory_id?.slice(0,8)} attempt ${d.attempt || 1}`; - case 'enrichment.complete': - const tagsAdded = d.tags_added?.length || 0; - return `${d.memory_id?.slice(0,8)} enriched` + (tagsAdded ? ` (+${tagsAdded} tags)` : ''); - case 'enrichment.failed': - return `${d.memory_id?.slice(0,8)}: ${truncate(d.error, 50)}`; - case 'consolidation.run': - return `${d.task_type} → ${d.affected_count || 0} memories`; - default: - return truncate(JSON.stringify(d), 60); - } - } + function getSummary(event) { + const d = event.data || {}; + const type = event.type; + + switch (type) { + case 'memory.store': + return escapeHtml(truncate(d.content, 60) || String(d.memory_id || '').slice(0, 8)); + case 'memory.recall': + return escapeHtml(`"${truncate(d.query, 40)}" → ${d.result_count || 0} results`); + case 'memory.associate': + return escapeHtml(`${String(d.memory1_id || '').slice(0,8)} → ${String(d.memory2_id || '').slice(0,8)} (${d.relation_type})`); + case 'enrichment.start': + return escapeHtml(`${String(d.memory_id || '').slice(0,8)} attempt ${d.attempt || 1}`); + case 'enrichment.complete': + const tagsAdded = d.tags_added?.length || 0; + return escapeHtml(`${String(d.memory_id || '').slice(0,8)} enriched` + (tagsAdded ? ` (+${tagsAdded} tags)` : '')); + case 'enrichment.failed': + return escapeHtml(`${String(d.memory_id || '').slice(0,8)}: ${truncate(d.error, 50)}`); + case 'consolidation.run': + return escapeHtml(`${d.task_type} → ${d.affected_count || 0} memories`); + default: + return escapeHtml(truncate(JSON.stringify(d), 60)); + } + } function renderEventBody(event) { const d = event.data || {}; const type = event.type; let html = ''; - switch (type) { - case 'memory.store': - html += `
- Type: ${d.type || 'Memory'} (${Math.round((d.type_confidence || 0) * 100)}%) - Importance: ${d.importance || 0} - Size: ${d.content?.length || 0} chars -
`; - if (d.content) { - html += `
${truncate(d.content, 200)}
`; - } - if (d.tags?.length) { - html += `
${d.tags.map(t => `${t}`).join('')}
`; - } - break; - - case 'memory.recall': - html += `
- Query: "${truncate(d.query, 50)}" -
`; + switch (type) { + case 'memory.store': + html += `
+ Type: ${escapeHtml(d.type || 'Memory')} (${Math.round((d.type_confidence || 0) * 100)}%) + Importance: ${d.importance || 0} + Size: ${d.content?.length || 0} chars +
`; + if (d.content) { + html += `
${escapeHtml(truncate(d.content, 200))}
`; + } + if (d.tags?.length) { + html += `
${d.tags.map(t => `${escapeHtml(t)}`).join('')}
`; + } + break; + + case 'memory.recall': + html += `
+ Query: "${escapeHtml(truncate(d.query, 50))}" +
`; html += `
Results: ${d.result_count || 0} Vector: ${d.vector_search ? '✓' : '✗'} @@ -875,84 +974,84 @@

AutoMem Monitor

Avg length: ${Math.round(d.stats.avg_length || 0)} chars
`; } - if (d.tags_filter?.length) { - html += `
${d.tags_filter.map(t => `${t}`).join('')}
`; - } - break; - - case 'enrichment.complete': - if (d.tags_added?.length) { - html += `
${d.tags_added.map(t => `+${t}`).join('')}
`; - } - if (d.entities && Object.keys(d.entities).length) { - html += `
`; - for (const [cat, vals] of Object.entries(d.entities)) { - if (vals?.length) { - html += `${cat}: ${vals.join(', ')}`; - } - } - html += `
`; - } + if (d.tags_filter?.length) { + html += `
${d.tags_filter.map(t => `${escapeHtml(t)}`).join('')}
`; + } + break; + + case 'enrichment.complete': + if (d.tags_added?.length) { + html += `
${d.tags_added.map(t => `+${escapeHtml(t)}`).join('')}
`; + } + if (d.entities && Object.keys(d.entities).length) { + html += `
`; + for (const [cat, vals] of Object.entries(d.entities)) { + if (vals?.length) { + html += `${escapeHtml(cat)}: ${escapeHtml(vals.join(', '))}`; + } + } + html += `
`; + } if (d.temporal_links?.length || d.semantic_neighbors?.length) { html += ``; - } - if (d.patterns_detected?.length) { - html += `
`; - for (const p of d.patterns_detected) { - html += `pattern: ${p.type} (${p.similar_memories} similar)`; - } - html += `
`; - } - break; - - case 'enrichment.failed': - html += `
${d.error || 'Unknown error'}
`; - html += `
- Attempt: ${d.attempt || '?'} - Will retry: ${d.will_retry ? 'Yes' : 'No'} -
`; - break; - - case 'memory.associate': - html += `
- From: ${d.memory1_id || '?'} - To: ${d.memory2_id || '?'} -
`; - html += `
- Type: ${d.relation_type || '?'} - Strength: ${d.strength || '?'} -
`; - break; - - case 'consolidation.run': - html += `
- Task: ${d.task_type || '?'} - Affected: ${d.affected_count || 0} memories - Success: ${d.success ? '✓' : '✗'} -
`; - break; - - default: - html += `
${JSON.stringify(d)}
`; - } - - // Raw JSON toggle - html += `
- Raw JSON -
${JSON.stringify(event, null, 2)}
-
`; + if (d.temporal_links?.length) { + const ids = d.temporal_links.map(id => String(id).slice(0,8)); + html += ``; + } + if (d.semantic_neighbors?.length) { + const neighbors = d.semantic_neighbors.map(item => { + const [id, score] = Array.isArray(item) ? item : [item, null]; + return `${escapeHtml(String(id).slice(0,8))}` + (score ? ` (${Number(score).toFixed(2)})` : ''); + }); + html += ``; + } + html += `
`; + } + if (d.patterns_detected?.length) { + html += `
`; + for (const p of d.patterns_detected) { + html += `pattern: ${escapeHtml(p.type)} (${p.similar_memories} similar)`; + } + html += `
`; + } + break; + + case 'enrichment.failed': + html += `
${escapeHtml(d.error || 'Unknown error')}
`; + html += `
+ Attempt: ${d.attempt || '?'} + Will retry: ${d.will_retry ? 'Yes' : 'No'} +
`; + break; + + case 'memory.associate': + html += `
+ From: ${escapeHtml(d.memory1_id || '?')} + To: ${escapeHtml(d.memory2_id || '?')} +
`; + html += `
+ Type: ${escapeHtml(d.relation_type || '?')} + Strength: ${d.strength || '?'} +
`; + break; + + case 'consolidation.run': + html += `
+ Task: ${escapeHtml(d.task_type || '?')} + Affected: ${d.affected_count || 0} memories + Success: ${d.success ? '✓' : '✗'} +
`; + break; + + default: + html += `
${escapeHtml(JSON.stringify(d))}
`; + } + + // Raw JSON toggle + html += `
+ Raw JSON +
${escapeHtml(JSON.stringify(event, null, 2))}
+
`; return html; } @@ -987,9 +1086,9 @@

AutoMem Monitor

// Create event card const category = getEventCategory(event.type); - const typeClass = getTypeClass(event.type); - const typeName = event.type.split('.').pop().toUpperCase(); - const d = event.data || {}; + const typeClass = getTypeClass(event.type); + const typeName = event.type.split('.').pop().toUpperCase(); + const d = event.data || {}; const ms = d.elapsed_ms; const perfType = event.type.includes('enrich') ? 'enrichment' : event.type.includes('recall') ? 'recall' : 'store'; @@ -1001,14 +1100,14 @@

AutoMem Monitor

const hidden = currentFilter !== 'all' && currentFilter !== category; if (hidden) card.classList.add('hidden'); - card.innerHTML = ` -
- ${typeName} - ${getSummary(event)}${getQualityBadges(d, event.type)} - ${getPerfBadge(ms, perfType)} - ${formatTime(event.timestamp)} - -
+ card.innerHTML = ` +
+ ${escapeHtml(typeName)} + ${getSummary(event)}${getQualityBadges(d, event.type)} + ${getPerfBadge(ms, perfType)} + ${formatTime(event.timestamp)} + +
${renderEventBody(event)}
`; From 0615de7c9cbf131815b1759c39e0796da4cc973e Mon Sep 17 00:00:00 2001 From: Jack Arturo Date: Tue, 13 Jan 2026 20:41:12 +0100 Subject: [PATCH 25/25] fix(monitor): emit store/recall/associate events --- automem/api/memory.py | 30 ++++++++++++++ automem/api/recall.py | 91 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 120 insertions(+), 1 deletion(-) diff --git a/automem/api/memory.py b/automem/api/memory.py index 1e4b0d7..8548a1b 100644 --- a/automem/api/memory.py +++ b/automem/api/memory.py @@ -7,6 +7,7 @@ from flask import Blueprint, abort, jsonify, request +from automem.api.stream import emit_event from automem.config import ( CLASSIFICATION_MODEL, MEMORY_AUTO_SUMMARIZE, @@ -346,6 +347,23 @@ def store() -> Any: "enrichment_queued": bool(state.enrichment_queue), }, ) + + emit_event( + "memory.store", + { + "memory_id": memory_id, + "content": content, + "type": memory_type, + "type_confidence": type_confidence, + "importance": importance, + "tags": tags, + "metadata": metadata, + "embedding_status": embedding_status, + "qdrant_status": qdrant_result, + "elapsed_ms": response["query_time_ms"], + }, + utc_now, + ) return jsonify(response), 201 @bp.route("/memory/", methods=["PATCH"]) @@ -611,6 +629,18 @@ def associate() -> Any: for prop in relation_config.get("properties", []): if prop in relationship_props: response[prop] = relationship_props[prop] + + emit_event( + "memory.associate", + { + "memory1_id": memory1_id, + "memory2_id": memory2_id, + "relation_type": relation_type, + "strength": strength, + "properties": relationship_props, + }, + utc_now, + ) return jsonify(response), 201 return bp diff --git a/automem/api/recall.py b/automem/api/recall.py index 82cb839..c448140 100644 --- a/automem/api/recall.py +++ b/automem/api/recall.py @@ -8,8 +8,10 @@ from flask import Blueprint, abort, jsonify, request +from automem.api.stream import emit_event from automem.config import ALLOWED_RELATIONS, RECALL_EXPANSION_LIMIT, RECALL_RELATION_LIMIT from automem.utils.graph import _serialize_node +from automem.utils.time import utc_now DEFAULT_STYLE_PRIORITY_TAGS: Set[str] = { "coding-style", @@ -1457,7 +1459,8 @@ def create_recall_blueprint( @bp.route("/recall", methods=["GET"]) def recall_memories() -> Any: - return handle_recall( + query_start = time.perf_counter() + response = handle_recall( get_memory_graph, get_qdrant_client, normalize_tag_list, @@ -1476,6 +1479,92 @@ def recall_memories() -> Any: expansion_limit_default=RECALL_EXPANSION_LIMIT, on_access=on_access, ) + elapsed_ms = int((time.perf_counter() - query_start) * 1000) + + result_count = 0 + dedup_removed = 0 + tags_filter: List[str] = [] + has_time_filter = False + vector_search_used = False + query_text = "" + result_summaries: List[Dict[str, Any]] = [] + + try: + data = response.get_json(silent=True) or {} + query_text = data.get("query") or "" + dedup_removed = int(data.get("dedup_removed") or 0) + tags_filter = data.get("tags") or [] + has_time_filter = bool(data.get("time_window")) + vector_info = data.get("vector_search") or {} + vector_search_used = bool(vector_info.get("matched")) or ( + bool(vector_info.get("enabled")) and bool(query_text) + ) + + results = data.get("results") or [] + result_count = int(data.get("count") or len(results)) + + for r in results[:10]: + if not isinstance(r, dict): + continue + memory = r.get("memory") or r.get("node") or {} + if not isinstance(memory, dict): + memory = {} + + tags_list = memory.get("tags") or [] + score_raw = r.get("final_score", r.get("score", 0)) + try: + score_val = round(float(score_raw or 0), 3) + except (TypeError, ValueError): + score_val = 0.0 + + result_summaries.append( + { + "id": str(r.get("id") or memory.get("id") or "")[:8], + "score": score_val, + "type": memory.get("type", "?"), + "content_length": len(memory.get("content", "") or ""), + "tags_count": len(tags_list) if isinstance(tags_list, (list, tuple)) else 0, + } + ) + except (AttributeError, TypeError, ValueError) as exc: + logger.debug("Failed to parse recall response for SSE stats", exc_info=exc) + + avg_length = 0 + avg_tags = 0 + score_min = 0 + score_max = 0 + if result_summaries: + avg_length = int( + sum(r["content_length"] for r in result_summaries) / len(result_summaries) + ) + avg_tags = round( + sum(r["tags_count"] for r in result_summaries) / len(result_summaries), 1 + ) + scores = [r["score"] for r in result_summaries] + score_min = min(scores) + score_max = max(scores) + + emit_event( + "memory.recall", + { + "query": query_text, + "result_count": result_count, + "dedup_removed": dedup_removed, + "elapsed_ms": elapsed_ms, + "tags_filter": tags_filter if tags_filter else [], + "has_time_filter": has_time_filter, + "vector_search": vector_search_used, + "stats": { + "avg_length": avg_length, + "avg_tags": avg_tags, + "score_range": [score_min, score_max], + }, + "result_summaries": result_summaries[:3], + }, + utc_now, + ) + + return response @bp.route("/startup-recall", methods=["GET"]) def startup_recall() -> Any: