Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
fe9902d
feat(watch): Enhanced SSE monitoring with detailed event data
jack-arturo Jan 9, 2026
c30abcd
feat(stream): Add memory.associate event logging
jack-arturo Jan 9, 2026
e582888
fix(test): Update enrichment test for dict return type
jack-arturo Jan 9, 2026
03a3eac
feat(stream): Enhanced enrichment events with full context
jack-arturo Jan 9, 2026
b16112c
fix: Production bugs - datetime tz, OpenAI tokens, Qdrant race
jack-arturo Jan 9, 2026
ad425b5
fix: Correct token parameter for gpt-5 models (max_output_tokens)
jack-arturo Jan 9, 2026
3666301
Merge branch 'main' into feat/enhanced-sse-monitoring
jack-arturo Jan 10, 2026
00012a2
fix: Defensive coding improvements for automem_watch.py
jack-arturo Jan 10, 2026
a621599
fix: More defensive coding for automem_watch.py payloads
jack-arturo Jan 10, 2026
05fb0b4
fix: Final defensive coding fixes for automem_watch.py
jack-arturo Jan 10, 2026
9538ab8
refactor(watch): Use Event type alias for all event handler functions
jack-arturo Jan 10, 2026
ef85ca0
fix(watch): Defensive unpacking for semantic_neighbors
jack-arturo Jan 10, 2026
5726da8
fix(watch): SSE multi-line parsing, narrow exceptions, auth error han…
jack-arturo Jan 10, 2026
f51551e
fix: Log malformed event data in stream_events
jack-arturo Jan 10, 2026
59ea7c4
feat: add Codex auto-fix workflow for CI failures
jack-arturo Jan 10, 2026
63f668a
fix(watch): SSE multi-line parsing, narrow exceptions, auth error han…
jack-arturo Jan 10, 2026
824a774
feat: Add web-based SSE monitor dashboard
jack-arturo Jan 10, 2026
ab08c1b
feat(monitor): Add login screen with shared auth (Graph Viewer style)
jack-arturo Jan 10, 2026
e766140
fix(monitor): Use api_key param instead of token for SSE auth
jack-arturo Jan 10, 2026
78d002c
feat(monitor): Unified operations dashboard with status cards and inl…
jack-arturo Jan 10, 2026
6d5d8be
feat: Add CORS support for cross-origin clients
jack-arturo Jan 11, 2026
8eabae6
feat(stream): Add optional JSONL event logging with history hydration
jack-arturo Jan 11, 2026
37ad479
docs: Add event logging and streaming endpoints to CLAUDE.md
jack-arturo Jan 11, 2026
d1bb238
revert: Remove auto-fix workflow and CI prompt files
jack-arturo Jan 13, 2026
151305e
fix: address CodeRabbit review feedback
jack-arturo Jan 13, 2026
0615de7
fix(monitor): emit store/recall/associate events
jack-arturo Jan 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 124 additions & 28 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -1765,21 +1765,34 @@ def enrichment_worker() -> None:
)

try:
processed = enrich_memory(job.memory_id, forced=job.forced)
result = enrich_memory(job.memory_id, forced=job.forced)
state.enrichment_stats.record_success(job.memory_id)
elapsed_ms = int((time.perf_counter() - enrich_start) * 1000)

emit_event(
"enrichment.complete",
{
"memory_id": job.memory_id,
"success": True,
"elapsed_ms": elapsed_ms,
"skipped": not processed,
"skipped": not result.get("processed", False),
"skip_reason": result.get("reason"),
# Full enrichment data:
"content": result.get("content", ""),
"tags_before": result.get("tags_before", []),
"tags_after": result.get("tags_after", []),
"tags_added": result.get("tags_added", []),
"entities": result.get("entities", {}),
"temporal_links": result.get("temporal_links", []),
"semantic_neighbors": result.get("semantic_neighbors", []),
"patterns_detected": result.get("patterns_detected", []),
"summary": result.get("summary", ""),
},
utc_now,
)
if not processed:
logger.debug("Enrichment skipped for %s (already processed)", job.memory_id)
if not result.get("processed"):
logger.debug(
"Enrichment skipped for %s (%s)", job.memory_id, result.get("reason")
)
except Exception as exc: # pragma: no cover - background thread
state.enrichment_stats.record_failure(str(exc))
elapsed_ms = int((time.perf_counter() - enrich_start) * 1000)
Expand Down Expand Up @@ -2083,8 +2096,21 @@ def _run_sync_check() -> None:
logger.exception("Sync check failed")


def enrich_memory(memory_id: str, *, forced: bool = False) -> bool:
"""Enrich a memory with relationships, patterns, and entity extraction."""
def enrich_memory(memory_id: str, *, forced: bool = False) -> Dict[str, Any]:
"""Enrich a memory with relationships, patterns, and entity extraction.

Returns a dict with enrichment details:
- processed: True if enrichment was performed
- content: Original memory content
- tags_before: Tags before enrichment
- tags_after: Tags after enrichment (includes entity tags)
- tags_added: Delta of tags added during enrichment
- entities: Dict of extracted entities by category
- temporal_links: List of linked memory IDs (PRECEDED_BY relationships)
- semantic_neighbors: List of (id, score) tuples for similar memories
- patterns_detected: List of detected pattern info
- summary: Generated summary
"""
graph = get_memory_graph()
if graph is None:
raise RuntimeError("FalkorDB unavailable for enrichment")
Expand All @@ -2093,7 +2119,7 @@ def enrich_memory(memory_id: str, *, forced: bool = False) -> bool:

if not result.result_set:
logger.debug("Skipping enrichment for %s; memory not found", memory_id)
return False
return {"processed": False, "reason": "memory_not_found"}

node = result.result_set[0][0]
properties = getattr(node, "properties", None)
Expand All @@ -2107,12 +2133,14 @@ def enrich_memory(memory_id: str, *, forced: bool = False) -> bool:

already_processed = bool(properties.get("processed"))
if already_processed and not forced:
return False
return {"processed": False, "reason": "already_processed"}

content = properties.get("content", "") or ""
entities = extract_entities(content)

tags = list(dict.fromkeys(_normalize_tag_list(properties.get("tags"))))
# Capture original tags before any modification
original_tags = list(dict.fromkeys(_normalize_tag_list(properties.get("tags"))))
tags = list(original_tags) # Copy for modification
entity_tags: Set[str] = set()

if entities:
Expand All @@ -2134,7 +2162,7 @@ def enrich_memory(memory_id: str, *, forced: bool = False) -> bool:

tag_prefixes = _compute_tag_prefixes(tags)

temporal_links = find_temporal_relationships(graph, memory_id)
temporal_link_ids = find_temporal_relationships(graph, memory_id)
pattern_info = detect_patterns(graph, memory_id, content)
semantic_neighbors = link_semantic_neighbors(graph, memory_id)

Expand All @@ -2151,7 +2179,7 @@ def enrich_memory(memory_id: str, *, forced: bool = False) -> bool:
{
"last_run": utc_now(),
"forced": forced,
"temporal_links": temporal_links,
"temporal_links": temporal_link_ids,
"patterns_detected": pattern_info,
"semantic_neighbors": [
{"id": neighbour_id, "score": score} for neighbour_id, score in semantic_neighbors
Expand Down Expand Up @@ -2209,17 +2237,34 @@ def enrich_memory(memory_id: str, *, forced: bool = False) -> bool:
logger.debug(
"Enriched memory %s (temporal=%s, patterns=%s, semantic=%s)",
memory_id,
temporal_links,
temporal_link_ids,
pattern_info,
len(semantic_neighbors),
)

return True
# Compute tags added during enrichment
tags_added = sorted(set(tags) - set(original_tags))

return {
"processed": True,
"content": content,
"tags_before": original_tags,
"tags_after": tags,
"tags_added": tags_added,
"entities": entities,
"temporal_links": temporal_link_ids,
"semantic_neighbors": [(nid[:8], round(score, 3)) for nid, score in semantic_neighbors],
"patterns_detected": pattern_info,
"summary": (summary or ""),
}


def find_temporal_relationships(graph: Any, memory_id: str, limit: int = 5) -> int:
"""Find and create temporal relationships with recent memories."""
created = 0
def find_temporal_relationships(graph: Any, memory_id: str, limit: int = 5) -> List[str]:
"""Find and create temporal relationships with recent memories.

Returns list of linked memory IDs.
"""
linked_ids: List[str] = []
try:
result = graph.query(
"""
Expand Down Expand Up @@ -2250,11 +2295,11 @@ def find_temporal_relationships(graph: Any, memory_id: str, limit: int = 5) -> i
""",
{"id1": memory_id, "id2": related_id, "timestamp": timestamp},
)
created += 1
linked_ids.append(related_id)
except Exception:
logger.exception("Failed to find temporal relationships")

return created
return linked_ids


def detect_patterns(graph: Any, memory_id: str, content: str) -> List[Dict[str, Any]]:
Expand Down Expand Up @@ -2690,13 +2735,16 @@ def store_memory() -> Any:
emit_event(
"memory.store",
{
"id": memory_id,
"content_preview": content[:100] + "..." if len(content) > 100 else content,
"memory_id": memory_id,
"content": content, # Full content
"type": memory_type,
"type_confidence": type_confidence,
"importance": importance,
"tags": tags[:5],
"size_bytes": len(content),
"elapsed_ms": int(response["query_time_ms"]),
"tags": tags, # All tags
"metadata": metadata, # Full metadata
"embedding_status": embedding_status,
"qdrant_status": qdrant_result,
"elapsed_ms": round(response["query_time_ms"], 2),
},
utc_now,
)
Expand Down Expand Up @@ -2972,23 +3020,60 @@ def recall_memories() -> Any:
# Emit SSE event for real-time monitoring
elapsed_ms = int((time.perf_counter() - query_start) * 1000)
result_count = 0
dedup_removed = 0
result_summaries: List[Dict[str, Any]] = []
try:
# Response is either a tuple (response, status) or Response object
resp_data = response[0] if isinstance(response, tuple) else response
if hasattr(resp_data, "get_json"):
data = resp_data.get_json(silent=True) or {}
result_count = len(data.get("memories", []))
results = data.get("results", [])
result_count = len(results)
dedup_removed = data.get("dedup_removed", 0)

# Build result summaries for top 10 results
for r in results[:10]:
mem = r.get("memory", {})
result_summaries.append(
{
"id": str(r.get("id", ""))[:8],
"score": round(float(r.get("final_score", 0)), 3),
"type": mem.get("type", "?"),
"content_length": len(mem.get("content", "")),
"tags_count": len(mem.get("tags", [])),
}
)
except Exception as e:
logger.debug("Failed to parse response for result_count", exc_info=e)

# Compute aggregate stats
avg_length = 0
avg_tags = 0
score_min = 0
score_max = 0
if result_summaries:
avg_length = int(sum(r["content_length"] for r in result_summaries) / len(result_summaries))
avg_tags = round(sum(r["tags_count"] for r in result_summaries) / len(result_summaries), 1)
scores = [r["score"] for r in result_summaries]
score_min = min(scores)
score_max = max(scores)

emit_event(
"memory.recall",
{
"query": query_text[:50] if query_text else "(no query)",
"limit": limit,
"query": query_text, # Full query
"result_count": result_count,
"dedup_removed": dedup_removed,
"elapsed_ms": elapsed_ms,
"tags": tag_filters[:3] if tag_filters else [],
"tags_filter": tag_filters if tag_filters else [],
"has_time_filter": bool(start_time or end_time),
"vector_search": bool(query_text or embedding_param),
"stats": {
"avg_length": avg_length,
"avg_tags": avg_tags,
"score_range": [score_min, score_max],
},
"result_summaries": result_summaries[:3], # Top 3 for display
},
utc_now,
)
Expand Down Expand Up @@ -3071,6 +3156,17 @@ def create_association() -> Any:
if prop in relationship_props:
response[prop] = relationship_props[prop]

emit_event(
"memory.associate",
{
"memory1_id": memory1_id,
"memory2_id": memory2_id,
"relation_type": relation_type,
"strength": strength,
},
utc_now,
)

return jsonify(response), 201


Expand Down
Loading