Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
fe9902d
feat(watch): Enhanced SSE monitoring with detailed event data
jack-arturo Jan 9, 2026
c30abcd
feat(stream): Add memory.associate event logging
jack-arturo Jan 9, 2026
e582888
fix(test): Update enrichment test for dict return type
jack-arturo Jan 9, 2026
03a3eac
feat(stream): Enhanced enrichment events with full context
jack-arturo Jan 9, 2026
b16112c
fix: Production bugs - datetime tz, OpenAI tokens, Qdrant race
jack-arturo Jan 9, 2026
ad425b5
fix: Correct token parameter for gpt-5 models (max_output_tokens)
jack-arturo Jan 9, 2026
3666301
Merge branch 'main' into feat/enhanced-sse-monitoring
jack-arturo Jan 10, 2026
00012a2
fix: Defensive coding improvements for automem_watch.py
jack-arturo Jan 10, 2026
a621599
fix: More defensive coding for automem_watch.py payloads
jack-arturo Jan 10, 2026
05fb0b4
fix: Final defensive coding fixes for automem_watch.py
jack-arturo Jan 10, 2026
9538ab8
refactor(watch): Use Event type alias for all event handler functions
jack-arturo Jan 10, 2026
ef85ca0
fix(watch): Defensive unpacking for semantic_neighbors
jack-arturo Jan 10, 2026
5726da8
fix(watch): SSE multi-line parsing, narrow exceptions, auth error han…
jack-arturo Jan 10, 2026
f51551e
fix: Log malformed event data in stream_events
jack-arturo Jan 10, 2026
59ea7c4
feat: add Codex auto-fix workflow for CI failures
jack-arturo Jan 10, 2026
63f668a
fix(watch): SSE multi-line parsing, narrow exceptions, auth error han…
jack-arturo Jan 10, 2026
824a774
feat: Add web-based SSE monitor dashboard
jack-arturo Jan 10, 2026
ab08c1b
feat(monitor): Add login screen with shared auth (Graph Viewer style)
jack-arturo Jan 10, 2026
e766140
fix(monitor): Use api_key param instead of token for SSE auth
jack-arturo Jan 10, 2026
78d002c
feat(monitor): Unified operations dashboard with status cards and inl…
jack-arturo Jan 10, 2026
6d5d8be
feat: Add CORS support for cross-origin clients
jack-arturo Jan 11, 2026
8eabae6
feat(stream): Add optional JSONL event logging with history hydration
jack-arturo Jan 11, 2026
37ad479
docs: Add event logging and streaming endpoints to CLAUDE.md
jack-arturo Jan 11, 2026
d1bb238
revert: Remove auto-fix workflow and CI prompt files
jack-arturo Jan 13, 2026
151305e
fix: address CodeRabbit review feedback
jack-arturo Jan 13, 2026
0615de7
fix(monitor): emit store/recall/associate events
jack-arturo Jan 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 102 additions & 19 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -1753,21 +1753,35 @@ def enrichment_worker() -> None:
)

try:
processed = enrich_memory(job.memory_id, forced=job.forced)
result = enrich_memory(job.memory_id, forced=job.forced)
state.enrichment_stats.record_success(job.memory_id)
elapsed_ms = int((time.perf_counter() - enrich_start) * 1000)

# Build entity counts for display
entities = result.get("entities", {})
entity_counts = {cat: len(vals) for cat, vals in entities.items() if vals}

emit_event(
"enrichment.complete",
{
"memory_id": job.memory_id,
"success": True,
"elapsed_ms": elapsed_ms,
"skipped": not processed,
"skipped": not result.get("processed", False),
"skip_reason": result.get("reason"),
# New detailed fields:
"entities": entity_counts,
"temporal_links": result.get("temporal_links", 0),
"patterns_detected": len(result.get("patterns_detected", [])),
"semantic_neighbors": result.get("semantic_neighbors", 0),
"summary_preview": result.get("summary", "")[:80],
"entity_tags_added": result.get("entity_tags_added", 0),
},
utc_now,
)
if not processed:
logger.debug("Enrichment skipped for %s (already processed)", job.memory_id)
if not result.get("processed"):
logger.debug(
"Enrichment skipped for %s (%s)", job.memory_id, result.get("reason")
)
except Exception as exc: # pragma: no cover - background thread
state.enrichment_stats.record_failure(str(exc))
elapsed_ms = int((time.perf_counter() - enrich_start) * 1000)
Expand Down Expand Up @@ -2071,8 +2085,18 @@ def _run_sync_check() -> None:
logger.exception("Sync check failed")


def enrich_memory(memory_id: str, *, forced: bool = False) -> bool:
"""Enrich a memory with relationships, patterns, and entity extraction."""
def enrich_memory(memory_id: str, *, forced: bool = False) -> Dict[str, Any]:
"""Enrich a memory with relationships, patterns, and entity extraction.

Returns a dict with enrichment details:
- processed: True if enrichment was performed
- entities: Dict of extracted entities by category
- temporal_links: Count of temporal relationships created
- patterns_detected: List of detected pattern info
- semantic_neighbors: Count of semantic neighbor links
- summary: Generated summary (truncated)
- entity_tags_added: Count of entity tags added
"""
graph = get_memory_graph()
if graph is None:
raise RuntimeError("FalkorDB unavailable for enrichment")
Expand All @@ -2081,7 +2105,7 @@ def enrich_memory(memory_id: str, *, forced: bool = False) -> bool:

if not result.result_set:
logger.debug("Skipping enrichment for %s; memory not found", memory_id)
return False
return {"processed": False, "reason": "memory_not_found"}

node = result.result_set[0][0]
properties = getattr(node, "properties", None)
Expand All @@ -2095,7 +2119,7 @@ def enrich_memory(memory_id: str, *, forced: bool = False) -> bool:

already_processed = bool(properties.get("processed"))
if already_processed and not forced:
return False
return {"processed": False, "reason": "already_processed"}

content = properties.get("content", "") or ""
entities = extract_entities(content)
Expand Down Expand Up @@ -2194,7 +2218,15 @@ def enrich_memory(memory_id: str, *, forced: bool = False) -> bool:
len(semantic_neighbors),
)

return True
return {
"processed": True,
"entities": entities,
"temporal_links": temporal_links,
"patterns_detected": pattern_info,
"semantic_neighbors": len(semantic_neighbors),
"summary": (summary or "")[:100],
"entity_tags_added": len(entity_tags),
}


def find_temporal_relationships(graph: Any, memory_id: str, limit: int = 5) -> int:
Expand Down Expand Up @@ -2670,13 +2702,16 @@ def store_memory() -> Any:
emit_event(
"memory.store",
{
"id": memory_id,
"content_preview": content[:100] + "..." if len(content) > 100 else content,
"memory_id": memory_id,
"content": content, # Full content
"type": memory_type,
"type_confidence": type_confidence,
"importance": importance,
"tags": tags[:5],
"size_bytes": len(content),
"elapsed_ms": int(response["query_time_ms"]),
"tags": tags, # All tags
"metadata": metadata, # Full metadata
"embedding_status": embedding_status,
"qdrant_status": qdrant_result,
"elapsed_ms": round(response["query_time_ms"], 2),
},
utc_now,
)
Expand Down Expand Up @@ -2952,23 +2987,60 @@ def recall_memories() -> Any:
# Emit SSE event for real-time monitoring
elapsed_ms = int((time.perf_counter() - query_start) * 1000)
result_count = 0
dedup_removed = 0
result_summaries: List[Dict[str, Any]] = []
try:
# Response is either a tuple (response, status) or Response object
resp_data = response[0] if isinstance(response, tuple) else response
if hasattr(resp_data, "get_json"):
data = resp_data.get_json(silent=True) or {}
result_count = len(data.get("memories", []))
results = data.get("results", [])
result_count = len(results)
dedup_removed = data.get("dedup_removed", 0)

# Build result summaries for top 10 results
for r in results[:10]:
mem = r.get("memory", {})
result_summaries.append(
{
"id": str(r.get("id", ""))[:8],
"score": round(float(r.get("final_score", 0)), 3),
"type": mem.get("type", "?"),
"content_length": len(mem.get("content", "")),
"tags_count": len(mem.get("tags", [])),
}
)
except Exception as e:
logger.debug("Failed to parse response for result_count", exc_info=e)

# Compute aggregate stats
avg_length = 0
avg_tags = 0
score_min = 0
score_max = 0
if result_summaries:
avg_length = int(sum(r["content_length"] for r in result_summaries) / len(result_summaries))
avg_tags = round(sum(r["tags_count"] for r in result_summaries) / len(result_summaries), 1)
scores = [r["score"] for r in result_summaries]
score_min = min(scores)
score_max = max(scores)

emit_event(
"memory.recall",
{
"query": query_text[:50] if query_text else "(no query)",
"limit": limit,
"query": query_text, # Full query
"result_count": result_count,
"dedup_removed": dedup_removed,
"elapsed_ms": elapsed_ms,
"tags": tag_filters[:3] if tag_filters else [],
"tags_filter": tag_filters if tag_filters else [],
"has_time_filter": bool(start_time or end_time),
"vector_search": bool(query_text or embedding_param),
"stats": {
"avg_length": avg_length,
"avg_tags": avg_tags,
"score_range": [score_min, score_max],
},
"result_summaries": result_summaries[:3], # Top 3 for display
},
utc_now,
)
Expand Down Expand Up @@ -3051,6 +3123,17 @@ def create_association() -> Any:
if prop in relationship_props:
response[prop] = relationship_props[prop]

emit_event(
"memory.associate",
{
"memory1_id": memory1_id,
"memory2_id": memory2_id,
"relation_type": relation_type,
"strength": strength,
},
utc_now,
)

return jsonify(response), 201


Expand Down
Loading
Loading