diff --git a/CLAUDE.md b/CLAUDE.md index a89a594..ba0fb1d 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -67,6 +67,15 @@ The API (`app.py`) provides 13 endpoints: ### Health - `GET /health` - Service health check with database connectivity status +### Streaming (SSE) +- `GET /stream` - Server-Sent Events endpoint for real-time event streaming (requires auth) +- `GET /stream/status` - Return current subscriber count +- `GET /stream/history` - Return cached events from log file for monitor hydration +- `GET /stream/log-status` - Return event log status (enabled, path, size, count) + +### Web UI +- `GET /monitor` - Unified operations dashboard (serves static HTML) + ## Architecture ### Data Flow @@ -230,6 +239,10 @@ ENRICHMENT_IDLE_SLEEP_SECONDS=2 # Worker sleep when idle ENRICHMENT_FAILURE_BACKOFF_SECONDS=5 # Backoff between retries ENRICHMENT_ENABLE_SUMMARIES=true # Toggle automatic summary creation ENRICHMENT_SPACY_MODEL=en_core_web_sm # spaCy model for entity extraction + +# Event logging (for /monitor history hydration) +AUTOMEM_EVENT_LOG= # Path to JSONL log file (empty = disabled) +AUTOMEM_EVENT_LOG_MAX=500 # Max events to retain before truncation ``` Install spaCy locally to improve entity extraction: diff --git a/app.py b/app.py index c40b08b..67df14b 100644 --- a/app.py +++ b/app.py @@ -28,6 +28,7 @@ from falkordb import FalkorDB from flask import Blueprint, Flask, abort, jsonify, request +from flask_cors import CORS from qdrant_client import QdrantClient from qdrant_client import models as qdrant_models @@ -110,7 +111,8 @@ def __init__(self, id: str, vector: List[float], payload: Dict[str, Any]): if str(root) not in sys.path: sys.path.insert(0, str(root)) -app = Flask(__name__) +app = Flask(__name__, static_folder="static") +CORS(app) # Enable CORS for Graph Viewer and other cross-origin clients # Legacy blueprint placeholders for deprecated route definitions below. # These are not registered with the app and are safe to keep until full removal. @@ -1028,9 +1030,12 @@ def extract_entities(content: str) -> Dict[str, List[str]]: class EnrichmentStats: processed_total: int = 0 successes: int = 0 + skips: int = 0 failures: int = 0 last_success_id: Optional[str] = None last_success_at: Optional[str] = None + last_skip_id: Optional[str] = None + last_skip_at: Optional[str] = None last_error: Optional[str] = None last_error_at: Optional[str] = None @@ -1040,6 +1045,12 @@ def record_success(self, memory_id: str) -> None: self.last_success_id = memory_id self.last_success_at = utc_now() + def record_skip(self, memory_id: str) -> None: + self.processed_total += 1 + self.skips += 1 + self.last_skip_id = memory_id + self.last_skip_at = utc_now() + def record_failure(self, error: str) -> None: self.processed_total += 1 self.failures += 1 @@ -1050,9 +1061,12 @@ def to_dict(self) -> Dict[str, Any]: return { "processed_total": self.processed_total, "successes": self.successes, + "skips": self.skips, "failures": self.failures, "last_success_id": self.last_success_id, "last_success_at": self.last_success_at, + "last_skip_id": self.last_skip_id, + "last_skip_at": self.last_skip_at, "last_error": self.last_error, "last_error_at": self.last_error_at, } @@ -1142,9 +1156,11 @@ def require_api_token() -> None: if not API_TOKEN: return - # Allow unauthenticated health checks (supports blueprint endpoint names) + # Allow unauthenticated health checks and monitor page endpoint = request.endpoint or "" - if endpoint.endswith("health") or request.path == "/health": + if endpoint == "static" or request.path.startswith("/static/"): + return + if endpoint.endswith("health") or request.path in ("/health", "/monitor"): return token = _extract_api_token() @@ -1765,21 +1781,37 @@ def enrichment_worker() -> None: ) try: - processed = enrich_memory(job.memory_id, forced=job.forced) - state.enrichment_stats.record_success(job.memory_id) + result = enrich_memory(job.memory_id, forced=job.forced) + if result.get("processed", False): + state.enrichment_stats.record_success(job.memory_id) + else: + state.enrichment_stats.record_skip(job.memory_id) elapsed_ms = int((time.perf_counter() - enrich_start) * 1000) + emit_event( "enrichment.complete", { "memory_id": job.memory_id, - "success": True, "elapsed_ms": elapsed_ms, - "skipped": not processed, + "skipped": not result.get("processed", False), + "skip_reason": result.get("reason"), + # Full enrichment data: + "content": result.get("content", ""), + "tags_before": result.get("tags_before", []), + "tags_after": result.get("tags_after", []), + "tags_added": result.get("tags_added", []), + "entities": result.get("entities", {}), + "temporal_links": result.get("temporal_links", []), + "semantic_neighbors": result.get("semantic_neighbors", []), + "patterns_detected": result.get("patterns_detected", []), + "summary": result.get("summary", ""), }, utc_now, ) - if not processed: - logger.debug("Enrichment skipped for %s (already processed)", job.memory_id) + if not result.get("processed"): + logger.debug( + "Enrichment skipped for %s (%s)", job.memory_id, result.get("reason") + ) except Exception as exc: # pragma: no cover - background thread state.enrichment_stats.record_failure(str(exc)) elapsed_ms = int((time.perf_counter() - enrich_start) * 1000) @@ -2083,8 +2115,21 @@ def _run_sync_check() -> None: logger.exception("Sync check failed") -def enrich_memory(memory_id: str, *, forced: bool = False) -> bool: - """Enrich a memory with relationships, patterns, and entity extraction.""" +def enrich_memory(memory_id: str, *, forced: bool = False) -> Dict[str, Any]: + """Enrich a memory with relationships, patterns, and entity extraction. + + Returns a dict with enrichment details: + - processed: True if enrichment was performed + - content: Original memory content + - tags_before: Tags before enrichment + - tags_after: Tags after enrichment (includes entity tags) + - tags_added: Delta of tags added during enrichment + - entities: Dict of extracted entities by category + - temporal_links: List of linked memory IDs (PRECEDED_BY relationships) + - semantic_neighbors: List of (id, score) tuples for similar memories + - patterns_detected: List of detected pattern info + - summary: Generated summary + """ graph = get_memory_graph() if graph is None: raise RuntimeError("FalkorDB unavailable for enrichment") @@ -2093,7 +2138,7 @@ def enrich_memory(memory_id: str, *, forced: bool = False) -> bool: if not result.result_set: logger.debug("Skipping enrichment for %s; memory not found", memory_id) - return False + return {"processed": False, "reason": "memory_not_found"} node = result.result_set[0][0] properties = getattr(node, "properties", None) @@ -2107,12 +2152,14 @@ def enrich_memory(memory_id: str, *, forced: bool = False) -> bool: already_processed = bool(properties.get("processed")) if already_processed and not forced: - return False + return {"processed": False, "reason": "already_processed"} content = properties.get("content", "") or "" entities = extract_entities(content) - tags = list(dict.fromkeys(_normalize_tag_list(properties.get("tags")))) + # Capture original tags before any modification + original_tags = list(dict.fromkeys(_normalize_tag_list(properties.get("tags")))) + tags = list(original_tags) # Copy for modification entity_tags: Set[str] = set() if entities: @@ -2134,7 +2181,7 @@ def enrich_memory(memory_id: str, *, forced: bool = False) -> bool: tag_prefixes = _compute_tag_prefixes(tags) - temporal_links = find_temporal_relationships(graph, memory_id) + temporal_link_ids = find_temporal_relationships(graph, memory_id) pattern_info = detect_patterns(graph, memory_id, content) semantic_neighbors = link_semantic_neighbors(graph, memory_id) @@ -2151,7 +2198,7 @@ def enrich_memory(memory_id: str, *, forced: bool = False) -> bool: { "last_run": utc_now(), "forced": forced, - "temporal_links": temporal_links, + "temporal_links": temporal_link_ids, "patterns_detected": pattern_info, "semantic_neighbors": [ {"id": neighbour_id, "score": score} for neighbour_id, score in semantic_neighbors @@ -2209,17 +2256,34 @@ def enrich_memory(memory_id: str, *, forced: bool = False) -> bool: logger.debug( "Enriched memory %s (temporal=%s, patterns=%s, semantic=%s)", memory_id, - temporal_links, + temporal_link_ids, pattern_info, len(semantic_neighbors), ) - return True + # Compute tags added during enrichment + tags_added = sorted(set(tags) - set(original_tags)) + + return { + "processed": True, + "content": content, + "tags_before": original_tags, + "tags_after": tags, + "tags_added": tags_added, + "entities": entities, + "temporal_links": temporal_link_ids, + "semantic_neighbors": [(nid[:8], round(score, 3)) for nid, score in semantic_neighbors], + "patterns_detected": pattern_info, + "summary": (summary or ""), + } + +def find_temporal_relationships(graph: Any, memory_id: str, limit: int = 5) -> List[str]: + """Find and create temporal relationships with recent memories. -def find_temporal_relationships(graph: Any, memory_id: str, limit: int = 5) -> int: - """Find and create temporal relationships with recent memories.""" - created = 0 + Returns list of linked memory IDs. + """ + linked_ids: List[str] = [] try: result = graph.query( """ @@ -2250,11 +2314,11 @@ def find_temporal_relationships(graph: Any, memory_id: str, limit: int = 5) -> i """, {"id1": memory_id, "id2": related_id, "timestamp": timestamp}, ) - created += 1 + linked_ids.append(related_id) except Exception: logger.exception("Failed to find temporal relationships") - return created + return linked_ids def detect_patterns(graph: Any, memory_id: str, content: str) -> List[Dict[str, Any]]: @@ -2690,13 +2754,16 @@ def store_memory() -> Any: emit_event( "memory.store", { - "id": memory_id, - "content_preview": content[:100] + "..." if len(content) > 100 else content, + "memory_id": memory_id, + "content": content, # Full content "type": memory_type, + "type_confidence": type_confidence, "importance": importance, - "tags": tags[:5], - "size_bytes": len(content), - "elapsed_ms": int(response["query_time_ms"]), + "tags": tags, # All tags + "metadata": metadata, # Full metadata + "embedding_status": embedding_status, + "qdrant_status": qdrant_result, + "elapsed_ms": round(response["query_time_ms"], 2), }, utc_now, ) @@ -2972,23 +3039,65 @@ def recall_memories() -> Any: # Emit SSE event for real-time monitoring elapsed_ms = int((time.perf_counter() - query_start) * 1000) result_count = 0 + dedup_removed = 0 + result_summaries: List[Dict[str, Any]] = [] try: # Response is either a tuple (response, status) or Response object resp_data = response[0] if isinstance(response, tuple) else response if hasattr(resp_data, "get_json"): data = resp_data.get_json(silent=True) or {} - result_count = len(data.get("memories", [])) - except Exception as e: + results = data.get("results", []) + result_count = len(results) + dedup_removed = data.get("dedup_removed", 0) + + # Build result summaries for top 10 results + for r in results[:10]: + mem = r.get("memory") or {} + if not isinstance(mem, dict): + mem = {} + tags_list = mem.get("tags") or [] + result_summaries.append( + { + "id": str(r.get("id", ""))[:8], + "score": round(float(r.get("final_score", 0)), 3), + "type": mem.get("type", "?"), + "content_length": len(mem.get("content", "")), + "tags_count": ( + len(tags_list) if isinstance(tags_list, (list, tuple)) else 0 + ), + } + ) + except (TypeError, ValueError, AttributeError) as e: logger.debug("Failed to parse response for result_count", exc_info=e) + # Compute aggregate stats + avg_length = 0 + avg_tags = 0 + score_min = 0 + score_max = 0 + if result_summaries: + avg_length = int(sum(r["content_length"] for r in result_summaries) / len(result_summaries)) + avg_tags = round(sum(r["tags_count"] for r in result_summaries) / len(result_summaries), 1) + scores = [r["score"] for r in result_summaries] + score_min = min(scores) + score_max = max(scores) + emit_event( "memory.recall", { - "query": query_text[:50] if query_text else "(no query)", - "limit": limit, + "query": query_text, # Full query "result_count": result_count, + "dedup_removed": dedup_removed, "elapsed_ms": elapsed_ms, - "tags": tag_filters[:3] if tag_filters else [], + "tags_filter": tag_filters if tag_filters else [], + "has_time_filter": bool(start_time or end_time), + "vector_search": bool(query_text or embedding_param), + "stats": { + "avg_length": avg_length, + "avg_tags": avg_tags, + "score_range": [score_min, score_max], + }, + "result_summaries": result_summaries[:3], # Top 3 for display }, utc_now, ) @@ -3071,6 +3180,17 @@ def create_association() -> Any: if prop in relationship_props: response[prop] = relationship_props[prop] + emit_event( + "memory.associate", + { + "memory1_id": memory1_id, + "memory2_id": memory2_id, + "relation_type": relation_type, + "strength": strength, + }, + utc_now, + ) + return jsonify(response), 201 @@ -3727,6 +3847,12 @@ def get_related_memories(memory_id: str) -> Any: app.register_blueprint(stream_bp) +@app.route("/monitor") +def monitor_page() -> Any: + """Serve the real-time SSE monitor dashboard.""" + return app.send_static_file("monitor.html") + + if __name__ == "__main__": port = int(os.environ.get("PORT", "8001")) logger.info("Starting Flask API on port %s", port) diff --git a/automem/api/memory.py b/automem/api/memory.py index 1e4b0d7..8548a1b 100644 --- a/automem/api/memory.py +++ b/automem/api/memory.py @@ -7,6 +7,7 @@ from flask import Blueprint, abort, jsonify, request +from automem.api.stream import emit_event from automem.config import ( CLASSIFICATION_MODEL, MEMORY_AUTO_SUMMARIZE, @@ -346,6 +347,23 @@ def store() -> Any: "enrichment_queued": bool(state.enrichment_queue), }, ) + + emit_event( + "memory.store", + { + "memory_id": memory_id, + "content": content, + "type": memory_type, + "type_confidence": type_confidence, + "importance": importance, + "tags": tags, + "metadata": metadata, + "embedding_status": embedding_status, + "qdrant_status": qdrant_result, + "elapsed_ms": response["query_time_ms"], + }, + utc_now, + ) return jsonify(response), 201 @bp.route("/memory/", methods=["PATCH"]) @@ -611,6 +629,18 @@ def associate() -> Any: for prop in relation_config.get("properties", []): if prop in relationship_props: response[prop] = relationship_props[prop] + + emit_event( + "memory.associate", + { + "memory1_id": memory1_id, + "memory2_id": memory2_id, + "relation_type": relation_type, + "strength": strength, + "properties": relationship_props, + }, + utc_now, + ) return jsonify(response), 201 return bp diff --git a/automem/api/recall.py b/automem/api/recall.py index 82cb839..c448140 100644 --- a/automem/api/recall.py +++ b/automem/api/recall.py @@ -8,8 +8,10 @@ from flask import Blueprint, abort, jsonify, request +from automem.api.stream import emit_event from automem.config import ALLOWED_RELATIONS, RECALL_EXPANSION_LIMIT, RECALL_RELATION_LIMIT from automem.utils.graph import _serialize_node +from automem.utils.time import utc_now DEFAULT_STYLE_PRIORITY_TAGS: Set[str] = { "coding-style", @@ -1457,7 +1459,8 @@ def create_recall_blueprint( @bp.route("/recall", methods=["GET"]) def recall_memories() -> Any: - return handle_recall( + query_start = time.perf_counter() + response = handle_recall( get_memory_graph, get_qdrant_client, normalize_tag_list, @@ -1476,6 +1479,92 @@ def recall_memories() -> Any: expansion_limit_default=RECALL_EXPANSION_LIMIT, on_access=on_access, ) + elapsed_ms = int((time.perf_counter() - query_start) * 1000) + + result_count = 0 + dedup_removed = 0 + tags_filter: List[str] = [] + has_time_filter = False + vector_search_used = False + query_text = "" + result_summaries: List[Dict[str, Any]] = [] + + try: + data = response.get_json(silent=True) or {} + query_text = data.get("query") or "" + dedup_removed = int(data.get("dedup_removed") or 0) + tags_filter = data.get("tags") or [] + has_time_filter = bool(data.get("time_window")) + vector_info = data.get("vector_search") or {} + vector_search_used = bool(vector_info.get("matched")) or ( + bool(vector_info.get("enabled")) and bool(query_text) + ) + + results = data.get("results") or [] + result_count = int(data.get("count") or len(results)) + + for r in results[:10]: + if not isinstance(r, dict): + continue + memory = r.get("memory") or r.get("node") or {} + if not isinstance(memory, dict): + memory = {} + + tags_list = memory.get("tags") or [] + score_raw = r.get("final_score", r.get("score", 0)) + try: + score_val = round(float(score_raw or 0), 3) + except (TypeError, ValueError): + score_val = 0.0 + + result_summaries.append( + { + "id": str(r.get("id") or memory.get("id") or "")[:8], + "score": score_val, + "type": memory.get("type", "?"), + "content_length": len(memory.get("content", "") or ""), + "tags_count": len(tags_list) if isinstance(tags_list, (list, tuple)) else 0, + } + ) + except (AttributeError, TypeError, ValueError) as exc: + logger.debug("Failed to parse recall response for SSE stats", exc_info=exc) + + avg_length = 0 + avg_tags = 0 + score_min = 0 + score_max = 0 + if result_summaries: + avg_length = int( + sum(r["content_length"] for r in result_summaries) / len(result_summaries) + ) + avg_tags = round( + sum(r["tags_count"] for r in result_summaries) / len(result_summaries), 1 + ) + scores = [r["score"] for r in result_summaries] + score_min = min(scores) + score_max = max(scores) + + emit_event( + "memory.recall", + { + "query": query_text, + "result_count": result_count, + "dedup_removed": dedup_removed, + "elapsed_ms": elapsed_ms, + "tags_filter": tags_filter if tags_filter else [], + "has_time_filter": has_time_filter, + "vector_search": vector_search_used, + "stats": { + "avg_length": avg_length, + "avg_tags": avg_tags, + "score_range": [score_min, score_max], + }, + "result_summaries": result_summaries[:3], + }, + utc_now, + ) + + return response @bp.route("/startup-recall", methods=["GET"]) def startup_recall() -> Any: diff --git a/automem/api/stream.py b/automem/api/stream.py index 98f0cb1..547f3db 100644 --- a/automem/api/stream.py +++ b/automem/api/stream.py @@ -3,11 +3,17 @@ Provides a /stream endpoint that emits events for memory operations, enrichment, and consolidation tasks. Uses an in-memory subscriber pattern with bounded queues per client. + +Optionally logs events to a JSONL file for persistence (env: AUTOMEM_EVENT_LOG). """ from __future__ import annotations import json +import logging +import os +from collections import deque +from pathlib import Path from queue import Empty, Full, Queue from threading import Lock from typing import Any, Callable, Dict, Generator, List @@ -18,11 +24,73 @@ _subscribers: List[Queue] = [] _subscribers_lock = Lock() +# Event log configuration +_event_log_path = os.getenv("AUTOMEM_EVENT_LOG", "") +_event_log_max = int(os.getenv("AUTOMEM_EVENT_LOG_MAX", "500")) +_event_log_lock = Lock() +_logger = logging.getLogger(__name__) + + +def _get_event_log_path() -> Path | None: + if not _event_log_path: + return None + + raw_path = Path(_event_log_path) + if not raw_path.is_absolute(): + if ".." in raw_path.parts: + _logger.warning("Refusing AUTOMEM_EVENT_LOG path traversal: %s", _event_log_path) + return None + raw_path = Path.cwd() / raw_path + + try: + return raw_path.resolve(strict=False) + except (OSError, RuntimeError, ValueError) as exc: + _logger.warning("Invalid AUTOMEM_EVENT_LOG path %s: %s", _event_log_path, exc) + return None + + +def _write_event_to_log(event: Dict[str, Any]) -> None: + """Append event to JSONL log file, truncating if needed. + + Thread-safe. Only writes if AUTOMEM_EVENT_LOG is set. + """ + path = _get_event_log_path() + if path is None: + return + + with _event_log_lock: + try: + path.parent.mkdir(parents=True, exist_ok=True) + except OSError as exc: + _logger.warning("Failed to create event log directory %s: %s", path.parent, exc) + return + + events: deque[str] = deque(maxlen=max(_event_log_max, 1)) + if path.exists(): + try: + with open(path, "r") as f: + for line in f: + stripped = line.strip() + if stripped: + events.append(stripped) + except OSError as exc: + _logger.warning("Failed to read event log %s: %s", path, exc) + + # Append new event + events.append(json.dumps(event)) + + try: + with open(path, "w") as f: + f.write("\n".join(events) + "\n") + except OSError as exc: + _logger.warning("Failed to write event log %s: %s", path, exc) + def emit_event(event_type: str, data: Dict[str, Any], utc_now: Callable[[], str]) -> None: """Emit an event to all SSE subscribers. Thread-safe. Drops events if a subscriber queue is full (slow client). + Also writes to JSONL log file if AUTOMEM_EVENT_LOG is configured. Args: event_type: Event type (e.g., "memory.store", "consolidation.run") @@ -34,6 +102,10 @@ def emit_event(event_type: str, data: Dict[str, Any], utc_now: Callable[[], str] "timestamp": utc_now(), "data": data, } + + # Write to log file if enabled + _write_event_to_log(event) + event_str = f"data: {json.dumps(event)}\n\n" with _subscribers_lock: @@ -50,6 +122,61 @@ def get_subscriber_count() -> int: return len(_subscribers) +def get_event_history(limit: int = 100) -> List[Dict[str, Any]]: + """Return recent events from the log file. + + Args: + limit: Maximum number of events to return + + Returns: + List of event dictionaries, oldest first + """ + path = _get_event_log_path() + if path is None: + return [] + + if not path.exists(): + return [] + + with _event_log_lock: + try: + with open(path, "r") as f: + lines = [line.strip() for line in f if line.strip()] + # Return last N events + return [json.loads(line) for line in lines[-limit:]] + except (OSError, json.JSONDecodeError, TypeError, ValueError) as exc: + _logger.warning("Failed to load event history from %s: %s", path, exc) + return [] + + +def get_log_status() -> Dict[str, Any]: + """Return event log status for display. + + Returns: + Dict with enabled, path, size_bytes, event_count, max_events + """ + path = _get_event_log_path() + enabled = bool(path) + size = 0 + count = 0 + + if path and path.exists(): + try: + size = path.stat().st_size + with open(path, "r") as f: + count = sum(1 for line in f if line.strip()) + except OSError: + pass + + return { + "enabled": enabled, + "path": str(path) if path else None, + "size_bytes": size, + "event_count": count, + "max_events": _event_log_max, + } + + def create_stream_blueprint( require_api_token: Callable[[], None], ) -> Blueprint: @@ -112,4 +239,22 @@ def stream_status() -> Any: } ) + @bp.route("/stream/history", methods=["GET"]) + def stream_history() -> Any: + """Return cached events from log file for monitor hydration.""" + from flask import jsonify, request + + require_api_token() + limit = request.args.get("limit", 100, type=int) + events = get_event_history(min(limit, 500)) + return jsonify({"events": events, "count": len(events)}) + + @bp.route("/stream/log-status", methods=["GET"]) + def stream_log_status() -> Any: + """Return event log status (enabled, path, size, count).""" + from flask import jsonify + + require_api_token() + return jsonify(get_log_status()) + return bp diff --git a/requirements.txt b/requirements.txt index 73637e1..8aeb6d1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ # requirements.txt - Updated versions for 2024/2025 flask==3.0.3 +flask-cors==6.0.0 falkordb==1.0.9 qdrant-client==1.11.3 python-dotenv==1.0.1 diff --git a/scripts/automem_watch.py b/scripts/automem_watch.py index 42f8fa5..a278d36 100755 --- a/scripts/automem_watch.py +++ b/scripts/automem_watch.py @@ -1,394 +1,475 @@ #!/usr/bin/env python3 -"""AutoMem real-time monitor - Terminal UI for observing memory service activity. +"""AutoMem tail - Simple streaming log of memory operations. Usage: - python scripts/automem_watch.py --url https://automem.railway.app --token $AUTOMEM_API_TOKEN + python scripts/automem_watch.py --url https://automem.up.railway.app --token $AUTOMEM_API_TOKEN python scripts/automem_watch.py --url http://localhost:8001 --token dev - -Features: - - Real-time SSE event streaming - - Garbage pattern detection (short content, test data, duplicates, bursts) - - Consolidation monitoring with timing - - Auto-reconnect on connection loss """ import argparse -import hashlib import json import sys -import threading +import textwrap import time -from collections import Counter, deque -from datetime import datetime -from typing import Deque, Dict, List, Optional, Set +from typing import Any, Mapping try: import httpx from rich.console import Console - from rich.layout import Layout - from rich.live import Live - from rich.panel import Panel - from rich.table import Table - from rich.text import Text + from rich.syntax import Syntax except ImportError: print("Required dependencies missing. Install with:") print(" pip install rich httpx") sys.exit(1) +console = Console() -class GarbageDetector: - """Detect suspicious patterns in memory stores.""" - - SUSPICIOUS_KEYWORDS = { - "test", - "asdf", - "xxx", - "lorem", - "foo", - "bar", - "baz", - "debug", - "tmp", - "placeholder", - "example", - "sample", - } - - def __init__(self, min_content_length: int = 10, burst_threshold: int = 5): - self.min_content_length = min_content_length - self.burst_threshold = burst_threshold - self.recent_hashes: Deque[str] = deque(maxlen=100) - self.warnings: Deque[str] = deque(maxlen=50) - self.counts: Counter = Counter() - self.burst_times: Deque[float] = deque(maxlen=20) - - def analyze(self, event: Dict) -> Optional[str]: - """Analyze a memory.store event for garbage patterns.""" - if event.get("type") != "memory.store": - return None - - data = event.get("data", {}) - content = data.get("content_preview", "") - now = time.time() - - # Check: Burst detection (>N stores in 10 seconds) - self.burst_times.append(now) - recent_count = sum(1 for t in self.burst_times if now - t < 10) - if recent_count >= self.burst_threshold: - warning = f"Burst detected: {recent_count} stores in 10s" - self._record(warning, "burst") - # Don't return - continue checking other patterns - - # Check: Very short content - if len(content) < self.min_content_length: - warning = f"Short ({len(content)} chars): '{content[:30]}'" - self._record(warning, "short_content") - return warning - - # Check: Test/debug keywords - content_lower = content.lower() - for kw in self.SUSPICIOUS_KEYWORDS: - if kw in content_lower and len(content) < 50: - warning = f"Test keyword '{kw}': '{content[:40]}...'" - self._record(warning, "test_keyword") - return warning - - # Check: No tags - tags = data.get("tags", []) - if not tags: - warning = f"No tags: '{content[:40]}...'" - self._record(warning, "no_tags") - return warning - - # Check: Very low importance with generic content - importance = data.get("importance", 0.5) - if importance < 0.3 and data.get("type") == "Memory": - warning = f"Low importance ({importance}): '{content[:40]}...'" - self._record(warning, "low_importance") - return warning - - # Check: Duplicate content (hash-based) - content_hash = hashlib.md5(content.encode()).hexdigest()[:8] - if content_hash in self.recent_hashes: - warning = f"Duplicate hash: '{content[:40]}...'" - self._record(warning, "duplicate") - return warning - self.recent_hashes.append(content_hash) - - return None - - def _record(self, warning: str, category: str) -> None: - ts = datetime.now().strftime("%H:%M:%S") - self.warnings.appendleft(f"[{ts}] {warning}") - self.counts[category] += 1 - - -class AutoMemMonitor: - """Real-time monitor for AutoMem service.""" - - def __init__(self, url: str, token: str, min_content_length: int = 10): - self.url = url.rstrip("/") - self.token = token - self.events: Deque[Dict] = deque(maxlen=100) - self.errors: Deque[str] = deque(maxlen=20) - self.stats = { - "stores": 0, - "recalls": 0, - "enriched": 0, - "consolidated": 0, - "errors": 0, - } - self.garbage = GarbageDetector(min_content_length) - self.start_time = datetime.now() - self.connected = False - self.last_event_time: Optional[datetime] = None - self.consolidation_history: Deque[Dict] = deque(maxlen=10) - - def connect(self): - """Connect to SSE stream and process events.""" - headers = {"Authorization": f"Bearer {self.token}"} - - with httpx.Client(timeout=None) as client: - with client.stream("GET", f"{self.url}/stream", headers=headers) as response: - if response.status_code != 200: - raise Exception(f"HTTP {response.status_code}: {response.text[:100]}") - self.connected = True - for line in response.iter_lines(): - if line.startswith("data: "): - try: - event = json.loads(line[6:]) - self._process_event(event) - except json.JSONDecodeError: - pass - elif line.startswith(":"): - # Keepalive comment - ignore - pass - - def _process_event(self, event: Dict) -> None: - """Process an incoming event.""" - self.last_event_time = datetime.now() - self.events.appendleft(event) - - event_type = event.get("type", "") - - # Update stats - if event_type == "memory.store": - self.stats["stores"] += 1 - # Check for garbage - warning = self.garbage.analyze(event) - if warning: - ts = datetime.now().strftime("%H:%M:%S") - self.errors.appendleft(f"[{ts}] [GARBAGE] {warning}") - elif event_type == "memory.recall": - self.stats["recalls"] += 1 - elif event_type == "enrichment.complete": - self.stats["enriched"] += 1 - elif event_type == "enrichment.failed": - self.stats["errors"] += 1 - ts = datetime.now().strftime("%H:%M:%S") - err = event.get("data", {}).get("error", "unknown")[:50] - self.errors.appendleft(f"[{ts}] [ENRICH] {err}") - elif event_type == "consolidation.run": - self.stats["consolidated"] += 1 - self.consolidation_history.appendleft(event.get("data", {})) - elif event_type == "error": - self.stats["errors"] += 1 - ts = datetime.now().strftime("%H:%M:%S") - err = event.get("data", {}).get("error", "unknown")[:50] - self.errors.appendleft(f"[{ts}] [ERROR] {err}") - - def render(self) -> Layout: - """Render the TUI layout.""" - layout = Layout() - - layout.split_column( - Layout(name="header", size=3), - Layout(name="main", ratio=2), - Layout(name="bottom", size=14), - ) +# Type alias for SSE event payloads +Event = Mapping[str, Any] - layout["main"].split_row( - Layout(name="events", ratio=2), - Layout(name="consolidation", ratio=1), - ) - layout["bottom"].split_row( - Layout(name="stats", ratio=1), - Layout(name="garbage", ratio=1), - Layout(name="errors", ratio=1), - ) +def format_timestamp(ts: Any) -> str: + """Extract just the time from ISO timestamp.""" + if ts is None: + return "" + ts = str(ts) + if "T" in ts: + return ts.split("T")[1][:12] + return ts[:12] - # Header - uptime = datetime.now() - self.start_time - uptime_str = ( - f"{int(uptime.total_seconds() // 3600)}h {int((uptime.total_seconds() % 3600) // 60)}m" - ) - status = "[green]Connected[/]" if self.connected else "[red]Disconnected[/]" - last_event = "" - if self.last_event_time: - ago = (datetime.now() - self.last_event_time).total_seconds() - if ago < 60: - last_event = f" | Last event: {int(ago)}s ago" + +def _safe_float(val: object, default: float = 0.0) -> float: + """Safely convert value to float.""" + try: + return float(val) if val is not None else default + except (TypeError, ValueError): + return default + + +def _safe_int(val: object, default: int = 0) -> int: + """Safely convert value to int.""" + try: + return int(float(val)) if val is not None else default + except (TypeError, ValueError): + return default + + +def print_store_event(event: Event) -> None: + """Print a memory.store event with full content.""" + data = event.get("data", {}) + ts = format_timestamp(event.get("timestamp", "")) + + console.print() + console.print(f"[bold green]━━━ STORE[/] [dim]{ts}[/]") + + # Memory info line - defensive conversions + mem_id = str(data.get("memory_id", "?"))[:8] + mem_type = data.get("type", "Memory") + type_conf = _safe_float(data.get("type_confidence"), 0.0) + importance = _safe_float(data.get("importance"), 0.5) + elapsed = _safe_int(data.get("elapsed_ms"), 0) + + console.print( + f" [cyan]id:[/] {mem_id} " + f"[cyan]type:[/] {mem_type} ({type_conf:.2f}) " + f"[cyan]importance:[/] {importance} " + f"[dim]{elapsed}ms[/]" + ) + + # Tags - coerce to strings for safety + tags = data.get("tags", []) + if tags: + safe_tags = [str(t) for t in tags] + console.print(f" [cyan]tags:[/] {', '.join(safe_tags)}") + + # Metadata - truncate large values + metadata = data.get("metadata", {}) + if metadata: + + def safe_val(v: object, max_len: int = 50) -> str: + s = str(v) if isinstance(v, (str, int, float, bool)) else repr(v) + return s[:max_len] + "..." if len(s) > max_len else s + + meta_preview = ", ".join(f"{k}={safe_val(v)}" for k, v in list(metadata.items())[:5]) + if len(metadata) > 5: + meta_preview += f", ... (+{len(metadata) - 5} more)" + console.print(f" [cyan]metadata:[/] {{{meta_preview}}}") + + # Embedding status + emb_status = data.get("embedding_status", "") + qdrant_status = data.get("qdrant_status", "") + if emb_status or qdrant_status: + console.print(f" [cyan]embedding:[/] {emb_status} [cyan]qdrant:[/] {qdrant_status}") + + # Full content + content = data.get("content", "") + if content: + console.print(" [cyan]content:[/]") + # Wrap long content + wrapped = textwrap.fill(content, width=100, initial_indent=" ", subsequent_indent=" ") + console.print(f"[white]{wrapped}[/]") + + +def print_recall_event(event: Event) -> None: + """Print a memory.recall event with query details and result summaries.""" + data = event.get("data", {}) + ts = format_timestamp(event.get("timestamp", "")) + + console.print() + console.print(f"[bold cyan]━━━ RECALL[/] [dim]{ts}[/]") + + # Query info - defensive conversions + query = data.get("query", "") + result_count = _safe_int(data.get("result_count"), 0) + dedup = _safe_int(data.get("dedup_removed"), 0) + elapsed = _safe_int(data.get("elapsed_ms"), 0) + + dedup_str = f" (dedup: {dedup})" if dedup else "" + console.print(f' [yellow]query:[/] "{query}"') + console.print(f" [yellow]results:[/] {result_count}{dedup_str} [dim]{elapsed}ms[/]") + + # Filters - defensive check for tags_filter + filters = [] + if data.get("has_time_filter"): + filters.append("time") + tags_filter = data.get("tags_filter") + if isinstance(tags_filter, (list, tuple, set)): + filters.append(f"tags({len(tags_filter)})") + elif tags_filter: + filters.append("tags(?)") + if data.get("vector_search"): + filters.append("vector") + if filters: + console.print(f" [yellow]filters:[/] {', '.join(filters)}") + + # Stats - defensive score_range handling + stats = data.get("stats", {}) + if stats: + avg_len = stats.get("avg_length", 0) + avg_tags = stats.get("avg_tags", 0) + score_range = stats.get("score_range", []) + + # Safely format score range + try: + if len(score_range) >= 2: + score_str = f"{float(score_range[0]):.2f}-{float(score_range[1]):.2f}" else: - last_event = f" | Last event: {int(ago // 60)}m ago" + score_str = "n/a" + except (TypeError, ValueError): + score_str = "n/a" + + console.print( + f" [yellow]stats:[/] avg_len={avg_len} avg_tags={avg_tags} " f"score_range={score_str}" + ) - layout["header"].update( - Panel( - f"[bold]AutoMem Watch[/] | {self.url} | {status} | Uptime: {uptime_str}{last_event}", - style="bold white on blue", + # Top results - defensive field conversions + summaries = data.get("result_summaries", []) + if summaries: + console.print(" [yellow]top results:[/]") + for i, r in enumerate(summaries[:3], 1): + r_type = str(r.get("type", "?"))[:8] + r_score = _safe_float(r.get("score"), 0.0) + r_len = _safe_int(r.get("content_length"), 0) + r_tags = _safe_int(r.get("tags_count"), 0) + console.print( + f" #{i} [{r_type:8s}] " f"score={r_score:.2f} " f"len={r_len} " f"tags={r_tags}" ) + + +def print_enrichment_event(event: Event) -> None: + """Print enrichment events with details.""" + data = event.get("data", {}) + ts = format_timestamp(event.get("timestamp", "")) + event_type = event.get("type", "") + + mem_id = str(data.get("memory_id", "?"))[:8] + + if event_type == "enrichment.start": + attempt = data.get("attempt", 1) + console.print(f"[dim]{ts}[/] [yellow]ENRICH[/] {mem_id} attempt {attempt}") + + elif event_type == "enrichment.complete": + elapsed = data.get("elapsed_ms", 0) + + if data.get("skipped"): + reason = data.get("skip_reason", "") + console.print(f"[dim]{ts}[/] [yellow]ENRICH[/] {mem_id} skipped ({reason})") + else: + console.print(f"\n[bold cyan]━━━ ENRICH[/] [dim]{ts}[/] [cyan]({elapsed}ms)[/]") + console.print(f" [dim]memory:[/] {mem_id}") + + # Content + content = data.get("content", "") + if content: + # Indent content lines + content_lines = content[:500].split("\n") + console.print(" [dim]content:[/]") + for line in content_lines[:8]: + console.print(f" {line}") + if len(content) > 500 or len(content_lines) > 8: + console.print(" [dim]...[/]") + + # Tags before/after + tags_before = data.get("tags_before", []) + tags_added = data.get("tags_added", []) + if tags_before or tags_added: + console.print("") + console.print(f" [dim]tags before:[/] {tags_before}") + if tags_added: + console.print(f" [green]tags added:[/] {tags_added}") + + # Entities by category + entities = data.get("entities", {}) + if entities and any(entities.values()): + console.print("") + console.print(" [dim]entities:[/]") + for category, values in entities.items(): + if values: + console.print(f" {category}: {', '.join(values)}") + + # Links created + temporal_links = data.get("temporal_links", []) + semantic_neighbors = data.get("semantic_neighbors", []) + patterns = data.get("patterns_detected", []) + + if temporal_links or semantic_neighbors or patterns: + console.print("") + console.print(" [dim]links created:[/]") + if temporal_links: + ids = [str(tid)[:8] for tid in temporal_links] + console.print(f" temporal: {', '.join(ids)} ({len(ids)} memories)") + if semantic_neighbors: + neighbor_strs = [] + for item in semantic_neighbors: + try: + nid, score = item[0], item[1] + neighbor_strs.append(f"{nid} ({score})") + except (IndexError, TypeError): + neighbor_strs.append(str(item)[:20]) + console.print(f" semantic: {', '.join(neighbor_strs)}") + if patterns: + for p in patterns: + ptype = p.get("type", "?") + similar = p.get("similar_memories", 0) + console.print(f" patterns: {ptype} ({similar} similar memories)") + + # Summary + summary = data.get("summary", "") + if summary: + console.print("") + console.print(f' [dim]summary:[/] "{summary[:100]}"') + + console.print("") # Blank line after + + elif event_type == "enrichment.failed": + error = data.get("error", "unknown")[:80] + attempt = data.get("attempt", 1) + will_retry = data.get("will_retry", False) + retry_str = " (will retry)" if will_retry else "" + console.print( + f"[dim]{ts}[/] [red]ENRICH FAIL[/] {mem_id} attempt {attempt}: {error}{retry_str}" ) - # Events table - events_table = Table(show_header=True, header_style="bold", expand=True, show_lines=False) - events_table.add_column("Time", width=8) - events_table.add_column("Type", width=18) - events_table.add_column("Details", overflow="ellipsis") - - for event in list(self.events)[:15]: - ts = event.get("timestamp", "") - if "T" in ts: - ts = ts.split("T")[1][:8] - event_type = event.get("type", "unknown") - data = event.get("data", {}) - - # Format based on event type - if event_type == "memory.store": - preview = data.get("content_preview", "")[:40] - details = f"{preview}... ({data.get('type', '?')})" - type_style = "green" - elif event_type == "memory.recall": - query = data.get("query", "")[:30] - details = ( - f"'{query}' -> {data.get('result_count', 0)} ({data.get('elapsed_ms', 0)}ms)" - ) - type_style = "cyan" - elif event_type == "enrichment.start": - details = f"{data.get('memory_id', '')[:8]}... attempt {data.get('attempt', 1)}" - type_style = "yellow" - elif event_type == "enrichment.complete": - status = "skipped" if data.get("skipped") else "done" - details = ( - f"{data.get('memory_id', '')[:8]}... {status} ({data.get('elapsed_ms', 0)}ms)" + +def print_consolidation_event(event: Event) -> None: + """Print consolidation events.""" + data = event.get("data", {}) + ts = format_timestamp(event.get("timestamp", "")) + + task_type = data.get("task_type", "?") + affected = data.get("affected_count", 0) + elapsed = data.get("elapsed_ms", 0) + success = data.get("success", True) + + status = "[green]OK[/]" if success else "[red]FAIL[/]" + console.print( + f"[dim]{ts}[/] [magenta]CONSOLIDATE[/] " + f"{task_type}: {affected} affected ({elapsed}ms) {status}" + ) + + +def print_error_event(event: Event) -> None: + """Print error events.""" + data = event.get("data", {}) + ts = format_timestamp(event.get("timestamp", "")) + + error = data.get("error", "unknown") + console.print(f"[dim]{ts}[/] [bold red]ERROR[/] {error}") + + +def print_associate_event(event: Event) -> None: + """Print a memory.associate event.""" + data = event.get("data", {}) + ts = format_timestamp(event.get("timestamp", "")) + + mem1 = str(data.get("memory1_id", "?"))[:8] + mem2 = str(data.get("memory2_id", "?"))[:8] + rel_type = data.get("relation_type", "?") + strength = data.get("strength", 0.5) + + console.print( + f"[dim]{ts}[/] [bold blue]ASSOCIATE[/] " + f"{mem1} [cyan]--{rel_type}-->[/] {mem2} " + f"[dim]strength={strength}[/]" + ) + + +def print_raw_event(event: Event) -> None: + """Print any other event as JSON.""" + ts = format_timestamp(event.get("timestamp", "")) + event_type = event.get("type", "unknown") + + console.print(f"[dim]{ts}[/] [blue]{event_type}[/]") + # Print data as formatted JSON + data = event.get("data", {}) + if data: + json_str = json.dumps(data, indent=2, default=str) + syntax = Syntax(json_str, "json", theme="monokai", line_numbers=False) + console.print(syntax) + + +def process_event(event: Event) -> None: + """Route event to appropriate printer.""" + event_type = event.get("type", "") + + if event_type == "memory.store": + print_store_event(event) + elif event_type == "memory.recall": + print_recall_event(event) + elif event_type == "memory.associate": + print_associate_event(event) + elif event_type.startswith("enrichment."): + print_enrichment_event(event) + elif event_type == "consolidation.run": + print_consolidation_event(event) + elif event_type == "error": + print_error_event(event) + else: + print_raw_event(event) + + +def stream_events(url: str, token: str, max_reconnects: int = 0) -> None: + """Connect to SSE stream and print events. + + Args: + url: AutoMem API base URL + token: API authentication token + max_reconnects: Max reconnection attempts (0 = unlimited) + """ + headers = {"Authorization": f"Bearer {token}"} + reconnect_count = 0 + consecutive_ssl_errors = 0 + + console.print(f"[bold]Connecting to {url}/stream...[/]") + console.print("[dim]Press Ctrl+C to stop[/]") + console.print() + + while True: + try: + # Explicit timeout: finite connect/write, infinite read for SSE + timeout = httpx.Timeout(connect=10.0, read=None, write=10.0, pool=10.0) + with httpx.Client(timeout=timeout) as client: + with client.stream("GET", f"{url}/stream", headers=headers) as resp: + # Differentiate auth errors from transient errors + if resp.status_code in (401, 403): + console.print(f"[red]Authentication failed (HTTP {resp.status_code})[/]") + console.print("[dim]Check your API token[/]") + break + if resp.status_code >= 500: + raise httpx.HTTPStatusError( + f"Server error {resp.status_code}", + request=resp.request, + response=resp, + ) + if resp.status_code != 200: + console.print(f"[red]HTTP {resp.status_code}[/]") + break + + # Reset SSL error counter on successful connect + consecutive_ssl_errors = 0 + + if reconnect_count > 0: + console.print(f"[green]Connected[/] [dim](reconnect #{reconnect_count})[/]") + else: + console.print("[green]Connected[/]") + + # SSE multi-line data buffer (per RFC 8895) + data_buffer: list[str] = [] + + for line in resp.iter_lines(): + if line.startswith("data:"): + # Accumulate data lines (strip "data:" or "data: " prefix) + payload = line[5:].lstrip(" ") if len(line) > 5 else "" + data_buffer.append(payload) + elif line == "": + # Empty line = end of event, dispatch buffered data + if data_buffer: + full_data = "\n".join(data_buffer) + data_buffer.clear() + try: + event = json.loads(full_data) + process_event(event) + except json.JSONDecodeError: + console.print( + f"[dim red]Malformed event data:[/] {full_data[:100]}" + ) + elif line.startswith(":"): + # Keepalive comment - ignore silently + pass + + except KeyboardInterrupt: + console.print("\n[bold]Disconnected.[/]") + break + except (httpx.RequestError, httpx.HTTPStatusError) as e: + reconnect_count += 1 + error_str = str(e) + + # Detect SSL errors (Railway load balancer kills idle SSE connections) + is_ssl_error = "SSL" in error_str or "ssl" in error_str or "handshake" in error_str + + if is_ssl_error: + consecutive_ssl_errors += 1 + # Exponential backoff for SSL errors: 2s, 4s, 8s, 16s, max 30s + backoff = min(30, 2**consecutive_ssl_errors) + console.print( + f"[yellow]SSL connection reset[/] [dim](Railway LB, backoff {backoff}s)[/]" ) - type_style = "green" - elif event_type == "enrichment.failed": - details = f"{data.get('memory_id', '')[:8]}... {data.get('error', '')[:30]}" - type_style = "red" - elif event_type == "consolidation.run": - details = f"{data.get('task_type', '?')} - {data.get('affected_count', 0)} affected ({data.get('elapsed_ms', 0)}ms)" - type_style = "magenta" else: - details = str(data)[:50] - type_style = "white" - - events_table.add_row(ts, f"[{type_style}]{event_type}[/]", details) - - layout["events"].update(Panel(events_table, title="Events (latest 15)")) - - # Consolidation panel - consol_text = Text() - if self.consolidation_history: - for run in list(self.consolidation_history)[:5]: - task = run.get("task_type", "?") - affected = run.get("affected_count", 0) - elapsed = run.get("elapsed_ms", 0) - success = "[green]OK[/]" if run.get("success") else "[red]FAIL[/]" - next_run = run.get("next_scheduled", "?") - consol_text.append(f"{task}: ", style="bold") - consol_text.append(f"{affected} affected, {elapsed}ms ") - consol_text.append_markup(success) - consol_text.append(f"\n Next: {next_run}\n", style="dim") - else: - consol_text.append("No consolidation runs yet", style="dim") - layout["consolidation"].update(Panel(consol_text, title="Consolidation")) - - # Stats panel - stats_text = Text() - stats_text.append(f"Stores: {self.stats['stores']}\n", style="green") - stats_text.append(f"Recalls: {self.stats['recalls']}\n", style="cyan") - stats_text.append(f"Enriched: {self.stats['enriched']}\n", style="yellow") - stats_text.append(f"Consolidated: {self.stats['consolidated']}\n", style="magenta") - stats_text.append(f"Errors: {self.stats['errors']}\n", style="red") - layout["stats"].update(Panel(stats_text, title="Stats")) - - # Garbage detector panel - garbage_text = Text() - if self.garbage.counts: - for category, count in self.garbage.counts.most_common(5): - garbage_text.append(f"{category}: {count}\n", style="yellow") - garbage_text.append("\n") - for warning in list(self.garbage.warnings)[:3]: - garbage_text.append(f"{warning[:45]}...\n", style="dim yellow") - else: - garbage_text.append("No suspicious patterns", style="green") - layout["garbage"].update(Panel(garbage_text, title="Garbage Detector")) - - # Errors panel - errors_text = Text() - if self.errors: - for err in list(self.errors)[:5]: - errors_text.append(f"{err[:50]}\n", style="red") - else: - errors_text.append("No errors", style="green") - layout["errors"].update(Panel(errors_text, title="Errors")) + consecutive_ssl_errors = 0 + backoff = 5 + console.print(f"[red]Connection error:[/] {e}") + + # Check max reconnects + if max_reconnects > 0 and reconnect_count >= max_reconnects: + console.print(f"[red]Max reconnects ({max_reconnects}) reached. Exiting.[/]") + break - return layout + console.print(f"[dim]Reconnecting in {backoff}s... (attempt #{reconnect_count})[/]") + time.sleep(backoff) -def main(): +def main() -> None: parser = argparse.ArgumentParser( - description="AutoMem real-time monitor", + description="AutoMem tail - stream memory operations", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: - # Local development python scripts/automem_watch.py --url http://localhost:8001 --token dev - - # Against Railway - python scripts/automem_watch.py --url https://automem.railway.app --token $AUTOMEM_API_TOKEN + python scripts/automem_watch.py --url https://automem.up.railway.app --token $TOKEN + python scripts/automem_watch.py --url $URL --token $TOKEN --max-reconnects 100 """, ) parser.add_argument("--url", required=True, help="AutoMem API URL") parser.add_argument("--token", required=True, help="API token") parser.add_argument( - "--min-content-length", + "--max-reconnects", type=int, - default=10, - help="Minimum content length before flagging as garbage (default: 10)", + default=0, + help="Max reconnection attempts (0 = unlimited, default)", ) args = parser.parse_args() - console = Console() - monitor = AutoMemMonitor(args.url, args.token, args.min_content_length) - - console.print(f"[bold]Connecting to {args.url}/stream...[/]") - - def connect_loop(): - while True: - try: - monitor.connect() - except KeyboardInterrupt: - break - except Exception as e: - monitor.connected = False - ts = datetime.now().strftime("%H:%M:%S") - monitor.errors.appendleft(f"[{ts}] [CONN] {str(e)[:50]}") - time.sleep(5) # Reconnect delay - - thread = threading.Thread(target=connect_loop, daemon=True) - thread.start() - - # Give connection time to establish - time.sleep(1) - - with Live(monitor.render(), console=console, refresh_per_second=2) as live: - try: - while True: - live.update(monitor.render()) - time.sleep(0.5) - except KeyboardInterrupt: - console.print("\n[bold]Disconnected.[/]") + stream_events(args.url.rstrip("/"), args.token, args.max_reconnects) if __name__ == "__main__": diff --git a/static/monitor.html b/static/monitor.html new file mode 100644 index 0000000..b94856d --- /dev/null +++ b/static/monitor.html @@ -0,0 +1,1131 @@ + + + + + + AutoMem Monitor + + + + +
+ +

AutoMem Monitor

+ + + +
+ + +
+
+

AutoMem Monitor

+
+ Connecting... + +
+
+ + +
+
+
Health
+
FalkorDB
+
Qdrant
+
Checking...
+
+
+
Memories
+
+
total memories
+
Vectors
+
+
+
Enrichment
+
Queue:
+
Failed:
+
Pending:
+
+
+
Consolidation
+
Decay:
+
Creative:
+
Next:
+
+
+ + +
+ Events: 0 + Errors: 0 + Avg Latency: + Reconnects: 0 + 📁 +
+ + +
+ + + + + + +
+ + +
+
+ + + +

Waiting for events...

+
+
+
+ + + + diff --git a/tests/test_enrichment.py b/tests/test_enrichment.py index 8458deb..5286fb9 100644 --- a/tests/test_enrichment.py +++ b/tests/test_enrichment.py @@ -108,8 +108,8 @@ def test_enrich_memory_updates_metadata(monkeypatch): fake_graph = FakeGraph() app.state.memory_graph = fake_graph - processed = app.enrich_memory("mem-1", forced=True) - assert processed is True + result = app.enrich_memory("mem-1", forced=True) + assert result.get("processed") is True assert fake_graph.temporal_calls, "Should create temporal relationships" assert fake_graph.pattern_calls, "Should update pattern nodes" @@ -119,7 +119,7 @@ def test_enrich_memory_updates_metadata(monkeypatch): update_payload = fake_graph.update_calls[-1] metadata = json.loads(update_payload["metadata"]) assert metadata["entities"]["projects"] == ["Launchpad"] - assert metadata["enrichment"]["temporal_links"] == 1 + assert len(metadata["enrichment"]["temporal_links"]) == 1 # Now returns list of IDs assert metadata["enrichment"]["patterns_detected"] assert update_payload["summary"].startswith("Met with Alice") assert "entity:projects:launchpad" in update_payload["tags"]