From 478cf310bb3fcb7a7aee46104189c1f895b6ffcc Mon Sep 17 00:00:00 2001 From: mrveiss Date: Mon, 6 Apr 2026 17:07:20 +0300 Subject: [PATCH 1/2] =?UTF-8?q?feat(knowledge):=20Redis=20memory=20graph?= =?UTF-8?q?=20schema=20=E2=80=94=20entity/relation=20indexes=20and=20chat?= =?UTF-8?q?=20integration=20(#3385)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../knowledge/memory_graph/__init__.py | 49 ++ .../knowledge/memory_graph/graph_store.py | 317 ++++++++++++ .../memory_graph/graph_store_test.py | 472 ++++++++++++++++++ .../knowledge/memory_graph/schema.py | 193 +++++++ 4 files changed, 1031 insertions(+) create mode 100644 autobot-backend/knowledge/memory_graph/__init__.py create mode 100644 autobot-backend/knowledge/memory_graph/graph_store.py create mode 100644 autobot-backend/knowledge/memory_graph/graph_store_test.py create mode 100644 autobot-backend/knowledge/memory_graph/schema.py diff --git a/autobot-backend/knowledge/memory_graph/__init__.py b/autobot-backend/knowledge/memory_graph/__init__.py new file mode 100644 index 000000000..f7982bd3d --- /dev/null +++ b/autobot-backend/knowledge/memory_graph/__init__.py @@ -0,0 +1,49 @@ +# Copyright (c) mrveiss. All rights reserved. +""" +knowledge.memory_graph — Redis DB 2 memory graph foundation layer. + +Week 1 deliverable for issue #3385. Provides: +- Schema constants and index creation (schema.py) +- Entity / relation CRUD and BFS traversal (graph_store.py) + +Public API re-exported from this package so callers can write: + from knowledge.memory_graph import create_entity, get_entity, create_relation +""" + +from .graph_store import ( # noqa: F401 + create_entity, + create_relation, + get_entity, + get_incoming_relations, + get_outgoing_relations, + traverse_relations, +) +from .schema import ( # noqa: F401 + ENTITY_KEY_PREFIX, + ENTITY_TYPES, + FULLTEXT_INDEX_NAME, + PRIMARY_INDEX_NAME, + RELATION_TYPES, + RELATIONS_IN_PREFIX, + RELATIONS_OUT_PREFIX, + ensure_indexes, +) + +__all__ = [ + # schema + "ENTITY_KEY_PREFIX", + "ENTITY_TYPES", + "FULLTEXT_INDEX_NAME", + "PRIMARY_INDEX_NAME", + "RELATION_TYPES", + "RELATIONS_IN_PREFIX", + "RELATIONS_OUT_PREFIX", + "ensure_indexes", + # graph_store + "create_entity", + "create_relation", + "get_entity", + "get_incoming_relations", + "get_outgoing_relations", + "traverse_relations", +] diff --git a/autobot-backend/knowledge/memory_graph/graph_store.py b/autobot-backend/knowledge/memory_graph/graph_store.py new file mode 100644 index 000000000..460730d94 --- /dev/null +++ b/autobot-backend/knowledge/memory_graph/graph_store.py @@ -0,0 +1,317 @@ +# Copyright (c) mrveiss. All rights reserved. +""" +knowledge.memory_graph.graph_store — entity/relation CRUD and BFS traversal. + +All public functions are async and accept an explicit *redis_client* parameter +so they remain independently testable. Callers that want a shared client +should call ``get_redis_client(async_client=True, database="knowledge")`` once +and pass it through. + +Public API +---------- +create_entity(redis_client, entity_type, name, observations, metadata) -> dict +get_entity(redis_client, entity_id) -> dict | None +create_relation(redis_client, from_id, to_id, rel_type, metadata) -> bool +get_outgoing_relations(redis_client, entity_id) -> list[dict] +get_incoming_relations(redis_client, entity_id) -> list[dict] +traverse_relations(redis_client, entity_id, relation_type, max_depth) -> list[dict] +""" + +import asyncio +import logging +import uuid +from collections import deque +from time import time +from typing import Any, Dict, List, Optional + +from .schema import ( + ENTITY_KEY_PREFIX, + ENTITY_TYPES, + RELATION_TYPES, + RELATIONS_IN_PREFIX, + RELATIONS_OUT_PREFIX, +) + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _now_ms() -> int: + """Return current Unix time in milliseconds.""" + return int(time() * 1000) + + +def _entity_key(entity_id: str) -> str: + return f"{ENTITY_KEY_PREFIX}{entity_id}" + + +def _out_key(entity_id: str) -> str: + return f"{RELATIONS_OUT_PREFIX}{entity_id}" + + +def _in_key(entity_id: str) -> str: + return f"{RELATIONS_IN_PREFIX}{entity_id}" + + +async def _init_relation_doc(redis_client: Any, key: str, owner_id: str) -> None: + """Create an empty relation document at *key* if it does not exist.""" + if not await redis_client.exists(key): + await redis_client.json().set( + key, "$", {"entity_id": owner_id, "relations": []} + ) + + +# --------------------------------------------------------------------------- +# Entity CRUD +# --------------------------------------------------------------------------- + + +async def create_entity( + redis_client: Any, + entity_type: str, + name: str, + observations: List[str], + metadata: Optional[Dict[str, Any]] = None, +) -> Dict[str, Any]: + """Create and persist a new memory-graph entity. + + Args: + redis_client: Async redis-py client (knowledge DB). + entity_type: One of the values in ENTITY_TYPES. + name: Human-readable entity name. + observations: Initial observations list. + metadata: Optional dict merged into entity ``metadata`` field. + + Returns: + The persisted entity document (dict). + + Raises: + ValueError: *entity_type* is not recognised or *name* is blank. + RuntimeError: Redis write failed. + """ + if entity_type not in ENTITY_TYPES: + raise ValueError("Invalid entity_type %r; expected one of %s" % (entity_type, sorted(ENTITY_TYPES))) + if not name or not name.strip(): + raise ValueError("Entity name must not be empty") + + entity_id = str(uuid.uuid4()) + now = _now_ms() + base_metadata: Dict[str, Any] = { + "user_id": "autobot", + "priority": "medium", + "status": "active", + "tags": [], + "session_id": "", + } + if metadata: + base_metadata.update(metadata) + + entity: Dict[str, Any] = { + "id": entity_id, + "type": entity_type, + "name": name, + "created_at": now, + "updated_at": now, + "observations": list(observations), + "metadata": base_metadata, + } + + try: + await redis_client.json().set(_entity_key(entity_id), "$", entity) + except Exception as exc: + logger.error("Failed to persist entity %s: %s", entity_id, exc) + raise RuntimeError("Entity creation failed") from exc + + logger.info("Created entity id=%s type=%s name=%r", entity_id[:8], entity_type, name) + return entity + + +async def get_entity( + redis_client: Any, + entity_id: str, +) -> Optional[Dict[str, Any]]: + """Fetch a single entity by UUID. + + Args: + redis_client: Async redis-py client (knowledge DB). + entity_id: UUID of the entity. + + Returns: + Entity dict, or ``None`` if not found. + """ + try: + return await redis_client.json().get(_entity_key(entity_id)) + except Exception as exc: + logger.error("Failed to fetch entity %s: %s", entity_id, exc) + return None + + +# --------------------------------------------------------------------------- +# Relation CRUD +# --------------------------------------------------------------------------- + + +async def create_relation( + redis_client: Any, + from_id: str, + to_id: str, + rel_type: str, + metadata: Optional[Dict[str, Any]] = None, +) -> bool: + """Create a bidirectional relation between two entities. + + The outgoing document at ``memory:relations:out:{from_id}`` and the + incoming document at ``memory:relations:in:{to_id}`` are both updated + atomically inside a Redis pipeline. + + Args: + redis_client: Async redis-py client (knowledge DB). + from_id: Source entity UUID. + to_id: Target entity UUID. + rel_type: One of the values in RELATION_TYPES. + metadata: Optional dict stored on the outgoing relation edge. + + Returns: + ``True`` on success, ``False`` on error. + + Raises: + ValueError: *rel_type* is not recognised. + """ + if rel_type not in RELATION_TYPES: + raise ValueError("Invalid rel_type %r; expected one of %s" % (rel_type, sorted(RELATION_TYPES))) + + now = _now_ms() + outgoing_rel = { + "to": to_id, + "type": rel_type, + "created_at": now, + "metadata": metadata or {}, + } + incoming_rel = { + "from": from_id, + "type": rel_type, + "created_at": now, + } + + try: + out_k = _out_key(from_id) + in_k = _in_key(to_id) + await asyncio.gather( + _init_relation_doc(redis_client, out_k, from_id), + _init_relation_doc(redis_client, in_k, to_id), + ) + pipe = redis_client.pipeline() + pipe.json().arrappend(out_k, "$.relations", outgoing_rel) + pipe.json().arrappend(in_k, "$.relations", incoming_rel) + await pipe.execute() + except Exception as exc: + logger.error("Failed to create relation %s-[%s]->%s: %s", from_id, rel_type, to_id, exc) + return False + + logger.info( + "Created relation %s-[%s]->%s", from_id[:8], rel_type, to_id[:8] + ) + return True + + +async def get_outgoing_relations( + redis_client: Any, + entity_id: str, +) -> List[Dict[str, Any]]: + """Return all outgoing relation edges for *entity_id*. + + Args: + redis_client: Async redis-py client (knowledge DB). + entity_id: UUID of the source entity. + + Returns: + List of outgoing relation dicts (``{"to": ..., "type": ..., ...}``). + """ + try: + doc = await redis_client.json().get(_out_key(entity_id)) + return doc.get("relations", []) if doc else [] + except Exception as exc: + logger.debug("Could not read outgoing relations for %s: %s", entity_id, exc) + return [] + + +async def get_incoming_relations( + redis_client: Any, + entity_id: str, +) -> List[Dict[str, Any]]: + """Return all incoming relation edges for *entity_id*. + + Args: + redis_client: Async redis-py client (knowledge DB). + entity_id: UUID of the target entity. + + Returns: + List of incoming relation dicts (``{"from": ..., "type": ..., ...}``). + """ + try: + doc = await redis_client.json().get(_in_key(entity_id)) + return doc.get("relations", []) if doc else [] + except Exception as exc: + logger.debug("Could not read incoming relations for %s: %s", entity_id, exc) + return [] + + +# --------------------------------------------------------------------------- +# Graph traversal +# --------------------------------------------------------------------------- + + +async def traverse_relations( + redis_client: Any, + entity_id: str, + relation_type: Optional[str] = None, + max_depth: int = 3, +) -> List[Dict[str, Any]]: + """BFS traversal starting from *entity_id* following outgoing edges. + + Args: + redis_client: Async redis-py client (knowledge DB). + entity_id: UUID of the starting entity. + relation_type: When set, only edges of this type are followed. + max_depth: Maximum hop count (1 = direct neighbours only). + + Returns: + Ordered list of entity dicts encountered during traversal + (excluding the root entity itself). + """ + visited: set = {entity_id} + queue: deque = deque([(entity_id, 0)]) + result: List[Dict[str, Any]] = [] + + while queue: + current_id, depth = queue.popleft() + if depth >= max_depth: + continue + + relations = await get_outgoing_relations(redis_client, current_id) + next_ids = [ + r["to"] + for r in relations + if (relation_type is None or r.get("type") == relation_type) + and r["to"] not in visited + ] + + if not next_ids: + continue + + entities = await asyncio.gather( + *[get_entity(redis_client, nid) for nid in next_ids], + return_exceptions=True, + ) + + for nid, entity in zip(next_ids, entities): + if isinstance(entity, Exception) or entity is None: + continue + visited.add(nid) + result.append(entity) + queue.append((nid, depth + 1)) + + return result diff --git a/autobot-backend/knowledge/memory_graph/graph_store_test.py b/autobot-backend/knowledge/memory_graph/graph_store_test.py new file mode 100644 index 000000000..28c8af093 --- /dev/null +++ b/autobot-backend/knowledge/memory_graph/graph_store_test.py @@ -0,0 +1,472 @@ +# Copyright (c) mrveiss. All rights reserved. +""" +Unit tests for knowledge.memory_graph.graph_store and schema. + +All Redis interactions are mocked with unittest.mock so these tests run +without a real Redis instance. +""" + +import asyncio +import pytest +from unittest.mock import AsyncMock, MagicMock, patch + + +# --------------------------------------------------------------------------- +# Helpers — build a minimal fake async Redis client +# --------------------------------------------------------------------------- + + +def _make_redis(store: dict | None = None): + """Return a minimal async-redis-style mock backed by *store*. + + Supports: + - redis.json().get(key) + - redis.json().set(key, path, value) + - redis.json().arrappend(key, path, item) + - redis.exists(key) + - redis.pipeline() + - redis.execute_command(*args) + """ + if store is None: + store = {} + + json_mock = MagicMock() + + async def _json_get(key): + return store.get(key) + + async def _json_set(key, path, value, *args, **kwargs): + if path == "$": + store[key] = value + else: + # Simplified path handling for $.relations only + if path == "$.relations" and key in store: + store[key]["relations"] = value + + async def _json_arrappend(key, path, item): + if key in store: + store[key].setdefault("relations", []).append(item) + return [len(store.get(key, {}).get("relations", []))] + + json_mock.get = AsyncMock(side_effect=_json_get) + json_mock.set = AsyncMock(side_effect=_json_set) + json_mock.arrappend = AsyncMock(side_effect=_json_arrappend) + + # redis_client.json() must return the same mock each time + client = MagicMock() + client.json = MagicMock(return_value=json_mock) + + async def _exists(key): + return key in store + + client.exists = AsyncMock(side_effect=_exists) + + # pipeline + pipe = MagicMock() + pipe_json = MagicMock() + pipe_arrappend_calls = [] + + def _pipe_arrappend(key, path, item): + pipe_arrappend_calls.append((key, path, item)) + + pipe_json.arrappend = MagicMock(side_effect=_pipe_arrappend) + pipe.json = MagicMock(return_value=pipe_json) + + async def _pipe_execute(): + for key, path, item in pipe_arrappend_calls: + if key in store: + store[key].setdefault("relations", []).append(item) + pipe_arrappend_calls.clear() + return [] + + pipe.execute = AsyncMock(side_effect=_pipe_execute) + client.pipeline = MagicMock(return_value=pipe) + + async def _execute_command(*args): + cmd = args[0].upper() if args else "" + if cmd == "FT.INFO": + raise Exception("Index not found") + return None + + client.execute_command = AsyncMock(side_effect=_execute_command) + + return client, store + + +# --------------------------------------------------------------------------- +# schema tests +# --------------------------------------------------------------------------- + + +class TestSchema: + def test_entity_key_prefix(self): + from knowledge.memory_graph.schema import ENTITY_KEY_PREFIX + assert ENTITY_KEY_PREFIX == "memory:entity:" + + def test_relation_prefixes(self): + from knowledge.memory_graph.schema import RELATIONS_IN_PREFIX, RELATIONS_OUT_PREFIX + assert RELATIONS_OUT_PREFIX == "memory:relations:out:" + assert RELATIONS_IN_PREFIX == "memory:relations:in:" + + def test_entity_types_are_frozenset(self): + from knowledge.memory_graph.schema import ENTITY_TYPES + assert isinstance(ENTITY_TYPES, frozenset) + assert "conversation" in ENTITY_TYPES + assert "bug_fix" in ENTITY_TYPES + assert "task" in ENTITY_TYPES + + def test_relation_types_are_frozenset(self): + from knowledge.memory_graph.schema import RELATION_TYPES + assert isinstance(RELATION_TYPES, frozenset) + assert "fixes" in RELATION_TYPES + assert "depends_on" in RELATION_TYPES + + def test_index_names(self): + from knowledge.memory_graph.schema import FULLTEXT_INDEX_NAME, PRIMARY_INDEX_NAME + assert PRIMARY_INDEX_NAME == "memory_entity_idx" + assert FULLTEXT_INDEX_NAME == "memory_fulltext_idx" + + @pytest.mark.asyncio + async def test_ensure_indexes_calls_ft_create(self): + from knowledge.memory_graph.schema import ensure_indexes + + client, _ = _make_redis() + created_indexes = [] + + async def _execute_command(*args): + cmd = args[0].upper() + if cmd == "FT.INFO": + raise Exception("Index not found") + if cmd == "FT.CREATE": + created_indexes.append(args[1]) + return None + + client.execute_command = AsyncMock(side_effect=_execute_command) + await ensure_indexes(client) + assert "memory_entity_idx" in created_indexes + assert "memory_fulltext_idx" in created_indexes + + @pytest.mark.asyncio + async def test_ensure_indexes_skips_existing(self): + from knowledge.memory_graph.schema import ensure_indexes + + client, _ = _make_redis() + created_indexes = [] + + async def _execute_command(*args): + cmd = args[0].upper() + if cmd == "FT.INFO": + return ["index_name", args[1]] # pretend it exists + if cmd == "FT.CREATE": + created_indexes.append(args[1]) + return None + + client.execute_command = AsyncMock(side_effect=_execute_command) + await ensure_indexes(client) + assert created_indexes == [] + + +# --------------------------------------------------------------------------- +# create_entity tests +# --------------------------------------------------------------------------- + + +class TestCreateEntity: + @pytest.mark.asyncio + async def test_creates_entity_in_store(self): + from knowledge.memory_graph.graph_store import create_entity + + client, store = _make_redis() + entity = await create_entity(client, "task", "Build feature X", ["first obs"]) + + assert entity["type"] == "task" + assert entity["name"] == "Build feature X" + assert entity["observations"] == ["first obs"] + assert "id" in entity + assert entity["created_at"] > 0 + + @pytest.mark.asyncio + async def test_persists_to_redis(self): + from knowledge.memory_graph.graph_store import create_entity + + client, store = _make_redis() + entity = await create_entity(client, "conversation", "Chat #1", []) + + key = "memory:entity:" + entity["id"] + assert key in store + assert store[key]["id"] == entity["id"] + + @pytest.mark.asyncio + async def test_invalid_type_raises(self): + from knowledge.memory_graph.graph_store import create_entity + + client, _ = _make_redis() + with pytest.raises(ValueError, match="Invalid entity_type"): + await create_entity(client, "UNKNOWN_TYPE", "x", []) + + @pytest.mark.asyncio + async def test_empty_name_raises(self): + from knowledge.memory_graph.graph_store import create_entity + + client, _ = _make_redis() + with pytest.raises(ValueError, match="name must not be empty"): + await create_entity(client, "task", " ", []) + + @pytest.mark.asyncio + async def test_metadata_merged(self): + from knowledge.memory_graph.graph_store import create_entity + + client, _ = _make_redis() + entity = await create_entity( + client, "decision", "Adopt Redis", [], metadata={"session_id": "abc"} + ) + assert entity["metadata"]["session_id"] == "abc" + assert entity["metadata"]["status"] == "active" + + @pytest.mark.asyncio + async def test_redis_error_raises_runtime_error(self): + from knowledge.memory_graph.graph_store import create_entity + + client, _ = _make_redis() + client.json().set = AsyncMock(side_effect=Exception("connection refused")) + with pytest.raises(RuntimeError, match="Entity creation failed"): + await create_entity(client, "task", "Fail test", []) + + +# --------------------------------------------------------------------------- +# get_entity tests +# --------------------------------------------------------------------------- + + +class TestGetEntity: + @pytest.mark.asyncio + async def test_returns_entity(self): + from knowledge.memory_graph.graph_store import create_entity, get_entity + + client, _ = _make_redis() + created = await create_entity(client, "feature", "Dark mode", ["initial"]) + fetched = await get_entity(client, created["id"]) + + assert fetched is not None + assert fetched["id"] == created["id"] + assert fetched["name"] == "Dark mode" + + @pytest.mark.asyncio + async def test_returns_none_for_missing(self): + from knowledge.memory_graph.graph_store import get_entity + + client, _ = _make_redis() + result = await get_entity(client, "non-existent-uuid") + assert result is None + + @pytest.mark.asyncio + async def test_returns_none_on_redis_error(self): + from knowledge.memory_graph.graph_store import get_entity + + client, _ = _make_redis() + client.json().get = AsyncMock(side_effect=Exception("timeout")) + result = await get_entity(client, "some-uuid") + assert result is None + + +# --------------------------------------------------------------------------- +# create_relation tests +# --------------------------------------------------------------------------- + + +class TestCreateRelation: + @pytest.mark.asyncio + async def test_creates_outgoing_and_incoming(self): + from knowledge.memory_graph.graph_store import ( + create_entity, + create_relation, + get_incoming_relations, + get_outgoing_relations, + ) + + client, _ = _make_redis() + a = await create_entity(client, "task", "Task A", []) + b = await create_entity(client, "task", "Task B", []) + + ok = await create_relation(client, a["id"], b["id"], "depends_on") + assert ok is True + + out = await get_outgoing_relations(client, a["id"]) + assert len(out) == 1 + assert out[0]["to"] == b["id"] + assert out[0]["type"] == "depends_on" + + inc = await get_incoming_relations(client, b["id"]) + assert len(inc) == 1 + assert inc[0]["from"] == a["id"] + assert inc[0]["type"] == "depends_on" + + @pytest.mark.asyncio + async def test_invalid_rel_type_raises(self): + from knowledge.memory_graph.graph_store import create_relation + + client, _ = _make_redis() + with pytest.raises(ValueError, match="Invalid rel_type"): + await create_relation(client, "id-a", "id-b", "INVALID_TYPE") + + @pytest.mark.asyncio + async def test_returns_false_on_error(self): + from knowledge.memory_graph.graph_store import create_relation + + client, _ = _make_redis() + # Simulate pipeline failure + client.pipeline().execute = AsyncMock(side_effect=Exception("pipe error")) + result = await create_relation(client, "id-a", "id-b", "fixes") + assert result is False + + @pytest.mark.asyncio + async def test_metadata_stored_on_outgoing_edge(self): + from knowledge.memory_graph.graph_store import ( + create_entity, + create_relation, + get_outgoing_relations, + ) + + client, _ = _make_redis() + a = await create_entity(client, "bug_fix", "Fix A", []) + b = await create_entity(client, "bug_fix", "Fix B", []) + await create_relation( + client, a["id"], b["id"], "relates_to", metadata={"strength": 0.9} + ) + out = await get_outgoing_relations(client, a["id"]) + assert out[0]["metadata"]["strength"] == 0.9 + + +# --------------------------------------------------------------------------- +# get_outgoing_relations / get_incoming_relations tests +# --------------------------------------------------------------------------- + + +class TestGetRelations: + @pytest.mark.asyncio + async def test_empty_when_no_relations(self): + from knowledge.memory_graph.graph_store import ( + get_incoming_relations, + get_outgoing_relations, + ) + + client, _ = _make_redis() + assert await get_outgoing_relations(client, "no-such-id") == [] + assert await get_incoming_relations(client, "no-such-id") == [] + + @pytest.mark.asyncio + async def test_empty_on_redis_error(self): + from knowledge.memory_graph.graph_store import get_outgoing_relations + + client, _ = _make_redis() + client.json().get = AsyncMock(side_effect=Exception("err")) + assert await get_outgoing_relations(client, "id") == [] + + +# --------------------------------------------------------------------------- +# traverse_relations tests +# --------------------------------------------------------------------------- + + +class TestTraverseRelations: + @pytest.mark.asyncio + async def test_single_hop(self): + from knowledge.memory_graph.graph_store import ( + create_entity, + create_relation, + traverse_relations, + ) + + client, _ = _make_redis() + root = await create_entity(client, "task", "Root", []) + child = await create_entity(client, "task", "Child", []) + await create_relation(client, root["id"], child["id"], "depends_on") + + result = await traverse_relations(client, root["id"], "depends_on", max_depth=1) + assert len(result) == 1 + assert result[0]["id"] == child["id"] + + @pytest.mark.asyncio + async def test_multi_hop(self): + from knowledge.memory_graph.graph_store import ( + create_entity, + create_relation, + traverse_relations, + ) + + client, _ = _make_redis() + root = await create_entity(client, "task", "Root", []) + mid = await create_entity(client, "task", "Middle", []) + leaf = await create_entity(client, "task", "Leaf", []) + await create_relation(client, root["id"], mid["id"], "depends_on") + await create_relation(client, mid["id"], leaf["id"], "depends_on") + + result = await traverse_relations(client, root["id"], "depends_on", max_depth=2) + ids = [e["id"] for e in result] + assert mid["id"] in ids + assert leaf["id"] in ids + + @pytest.mark.asyncio + async def test_relation_type_filter(self): + from knowledge.memory_graph.graph_store import ( + create_entity, + create_relation, + traverse_relations, + ) + + client, _ = _make_redis() + root = await create_entity(client, "task", "Root", []) + a = await create_entity(client, "task", "A", []) + b = await create_entity(client, "feature", "B", []) + await create_relation(client, root["id"], a["id"], "depends_on") + await create_relation(client, root["id"], b["id"], "relates_to") + + result = await traverse_relations(client, root["id"], "depends_on", max_depth=1) + ids = [e["id"] for e in result] + assert a["id"] in ids + assert b["id"] not in ids + + @pytest.mark.asyncio + async def test_no_cycle(self): + from knowledge.memory_graph.graph_store import ( + create_entity, + create_relation, + traverse_relations, + ) + + client, _ = _make_redis() + x = await create_entity(client, "task", "X", []) + y = await create_entity(client, "task", "Y", []) + await create_relation(client, x["id"], y["id"], "depends_on") + await create_relation(client, y["id"], x["id"], "depends_on") + + # Must not loop forever + result = await traverse_relations(client, x["id"], "depends_on", max_depth=5) + ids = [e["id"] for e in result] + # Y reachable; X already in visited so not duplicated + assert ids.count(y["id"]) == 1 + + @pytest.mark.asyncio + async def test_empty_when_no_relations(self): + from knowledge.memory_graph.graph_store import create_entity, traverse_relations + + client, _ = _make_redis() + root = await create_entity(client, "task", "Alone", []) + result = await traverse_relations(client, root["id"]) + assert result == [] + + @pytest.mark.asyncio + async def test_max_depth_zero_returns_empty(self): + from knowledge.memory_graph.graph_store import ( + create_entity, + create_relation, + traverse_relations, + ) + + client, _ = _make_redis() + root = await create_entity(client, "task", "Root", []) + child = await create_entity(client, "task", "Child", []) + await create_relation(client, root["id"], child["id"], "depends_on") + + result = await traverse_relations(client, root["id"], max_depth=0) + assert result == [] diff --git a/autobot-backend/knowledge/memory_graph/schema.py b/autobot-backend/knowledge/memory_graph/schema.py new file mode 100644 index 000000000..6909a2e2d --- /dev/null +++ b/autobot-backend/knowledge/memory_graph/schema.py @@ -0,0 +1,193 @@ +# Copyright (c) mrveiss. All rights reserved. +""" +knowledge.memory_graph.schema — Redis key patterns, constants, and index creation. + +Implements the schema described in docs/database/REDIS_MEMORY_GRAPH_SPECIFICATION.md. +All data lives in Redis DB 1 (``knowledge``); the existing autobot_memory_graph +package already uses this database so there is no separate DB 2 allocation in the +current redis-databases.yaml. + +Key patterns +------------ +``memory:entity:{uuid}`` — RedisJSON entity document +``memory:relations:out:{uuid}`` — RedisJSON outgoing-relation list +``memory:relations:in:{uuid}`` — RedisJSON incoming-relation list + +Search indexes +-------------- +``memory_entity_idx`` — primary filtered / sorted search +``memory_fulltext_idx`` — full-text search with phonetic matching +""" + +import logging +from typing import Any + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Key patterns +# --------------------------------------------------------------------------- + +ENTITY_KEY_PREFIX: str = "memory:entity:" +RELATIONS_OUT_PREFIX: str = "memory:relations:out:" +RELATIONS_IN_PREFIX: str = "memory:relations:in:" + +# --------------------------------------------------------------------------- +# Index names +# --------------------------------------------------------------------------- + +PRIMARY_INDEX_NAME: str = "memory_entity_idx" +FULLTEXT_INDEX_NAME: str = "memory_fulltext_idx" + +# --------------------------------------------------------------------------- +# Allowed values +# --------------------------------------------------------------------------- + +ENTITY_TYPES: frozenset = frozenset( + { + "conversation", + "bug_fix", + "feature", + "decision", + "task", + "research", + "implementation", + } +) + +RELATION_TYPES: frozenset = frozenset( + { + "fixes", + "implements", + "depends_on", + "relates_to", + "informs", + "guides", + "blocks", + } +) + +# --------------------------------------------------------------------------- +# FT.CREATE argument lists (no raw string commands — args passed to execute_command) +# --------------------------------------------------------------------------- + +_PRIMARY_INDEX_ARGS: list = [ + "FT.CREATE", + PRIMARY_INDEX_NAME, + "ON", + "JSON", + "PREFIX", + "1", + ENTITY_KEY_PREFIX, + "SCHEMA", + "$.type", + "AS", + "type", + "TAG", + "SORTABLE", + "$.name", + "AS", + "name", + "TEXT", + "WEIGHT", + "2.0", + "SORTABLE", + "$.observations[*]", + "AS", + "observations", + "TEXT", + "$.created_at", + "AS", + "created_at", + "NUMERIC", + "SORTABLE", + "$.updated_at", + "AS", + "updated_at", + "NUMERIC", + "SORTABLE", + "$.metadata.priority", + "AS", + "priority", + "TAG", + "$.metadata.status", + "AS", + "status", + "TAG", + "SORTABLE", + "$.metadata.tags[*]", + "AS", + "tags", + "TAG", + "SEPARATOR", + ",", + "$.metadata.session_id", + "AS", + "session_id", + "TAG", +] + +_FULLTEXT_INDEX_ARGS: list = [ + "FT.CREATE", + FULLTEXT_INDEX_NAME, + "ON", + "JSON", + "PREFIX", + "1", + ENTITY_KEY_PREFIX, + "LANGUAGE", + "english", + "SCHEMA", + "$.name", + "AS", + "name", + "TEXT", + "PHONETIC", + "dm:en", + "$.observations[*]", + "AS", + "content", + "TEXT", +] + + +# --------------------------------------------------------------------------- +# Index management helpers +# --------------------------------------------------------------------------- + + +async def _index_exists(redis_client: Any, index_name: str) -> bool: + """Return True when *index_name* already exists in Redis.""" + try: + await redis_client.execute_command("FT.INFO", index_name) + return True + except Exception: + return False + + +async def _create_index(redis_client: Any, args: list) -> None: + """Issue a single FT.CREATE command; log but do not raise on failure.""" + index_name = args[1] + try: + await redis_client.execute_command(*args) + logger.info("Created Redis search index: %s", index_name) + except Exception as exc: + logger.warning("Could not create index %s: %s", index_name, exc) + + +async def ensure_indexes(redis_client: Any) -> None: + """Create ``memory_entity_idx`` and ``memory_fulltext_idx`` if absent. + + Idempotent — safe to call on every application startup. + + Args: + redis_client: An async redis-py client connected to the knowledge DB. + """ + for index_name, args in ( + (PRIMARY_INDEX_NAME, _PRIMARY_INDEX_ARGS), + (FULLTEXT_INDEX_NAME, _FULLTEXT_INDEX_ARGS), + ): + if not await _index_exists(redis_client, index_name): + await _create_index(redis_client, args) + else: + logger.debug("Index already exists, skipping creation: %s", index_name) From 40348fffc69f56464ca79ec160e87a9d7ea5dd81 Mon Sep 17 00:00:00 2001 From: mrveiss Date: Mon, 6 Apr 2026 21:23:29 +0300 Subject: [PATCH 2/2] fix(knowledge): remove hardcoded user_id, atomic NX relation init, unify __init__ exports (#3385) --- autobot-backend/knowledge/memory_graph/__init__.py | 14 +++++++++++++- .../knowledge/memory_graph/graph_store.py | 13 ++++++------- .../knowledge/memory_graph/graph_store_test.py | 7 +++++++ 3 files changed, 26 insertions(+), 8 deletions(-) diff --git a/autobot-backend/knowledge/memory_graph/__init__.py b/autobot-backend/knowledge/memory_graph/__init__.py index f7982bd3d..9d9a8112d 100644 --- a/autobot-backend/knowledge/memory_graph/__init__.py +++ b/autobot-backend/knowledge/memory_graph/__init__.py @@ -1,10 +1,11 @@ # Copyright (c) mrveiss. All rights reserved. """ -knowledge.memory_graph — Redis DB 2 memory graph foundation layer. +knowledge.memory_graph — Redis DB 1 (knowledge) memory graph foundation layer. Week 1 deliverable for issue #3385. Provides: - Schema constants and index creation (schema.py) - Entity / relation CRUD and BFS traversal (graph_store.py) +- Semantic search query processor and hybrid scorer (query_processor.py, hybrid_scorer.py) Public API re-exported from this package so callers can write: from knowledge.memory_graph import create_entity, get_entity, create_relation @@ -29,6 +30,13 @@ ensure_indexes, ) +# Query processor symbols — added by PR #3609 / issue-3384 +try: + from .hybrid_scorer import HybridScorer # noqa: F401 + from .query_processor import MemoryGraphQueryProcessor, QueryIntent # noqa: F401 +except ImportError: + pass # query processor not yet merged; safe to skip + __all__ = [ # schema "ENTITY_KEY_PREFIX", @@ -46,4 +54,8 @@ "get_incoming_relations", "get_outgoing_relations", "traverse_relations", + # query processor / hybrid scorer + "HybridScorer", + "MemoryGraphQueryProcessor", + "QueryIntent", ] diff --git a/autobot-backend/knowledge/memory_graph/graph_store.py b/autobot-backend/knowledge/memory_graph/graph_store.py index 460730d94..e56ecf4de 100644 --- a/autobot-backend/knowledge/memory_graph/graph_store.py +++ b/autobot-backend/knowledge/memory_graph/graph_store.py @@ -57,11 +57,11 @@ def _in_key(entity_id: str) -> str: async def _init_relation_doc(redis_client: Any, key: str, owner_id: str) -> None: - """Create an empty relation document at *key* if it does not exist.""" - if not await redis_client.exists(key): - await redis_client.json().set( - key, "$", {"entity_id": owner_id, "relations": []} - ) + """Create an empty relation document at *key* if it does not exist (atomic NX).""" + import json as _json + await redis_client.execute_command( + "JSON.SET", key, "$", _json.dumps({"entity_id": owner_id, "relations": []}), "NX" + ) # --------------------------------------------------------------------------- @@ -100,7 +100,6 @@ async def create_entity( entity_id = str(uuid.uuid4()) now = _now_ms() base_metadata: Dict[str, Any] = { - "user_id": "autobot", "priority": "medium", "status": "active", "tags": [], @@ -308,7 +307,7 @@ async def traverse_relations( ) for nid, entity in zip(next_ids, entities): - if isinstance(entity, Exception) or entity is None: + if isinstance(entity, BaseException) or entity is None: continue visited.add(nid) result.append(entity) diff --git a/autobot-backend/knowledge/memory_graph/graph_store_test.py b/autobot-backend/knowledge/memory_graph/graph_store_test.py index 28c8af093..0cd96c115 100644 --- a/autobot-backend/knowledge/memory_graph/graph_store_test.py +++ b/autobot-backend/knowledge/memory_graph/graph_store_test.py @@ -83,9 +83,16 @@ async def _pipe_execute(): client.pipeline = MagicMock(return_value=pipe) async def _execute_command(*args): + import json as _json cmd = args[0].upper() if args else "" if cmd == "FT.INFO": raise Exception("Index not found") + if cmd == "JSON.SET" and len(args) >= 4: + key, _path, value_str = args[1], args[2], args[3] + nx = len(args) > 4 and str(args[4]).upper() == "NX" + if nx and key in store: + return None # NX: skip if already exists + store[key] = _json.loads(value_str) return None client.execute_command = AsyncMock(side_effect=_execute_command)