diff --git a/.cursor/worktrees.json b/.cursor/worktrees.json new file mode 100644 index 000000000..16cbbee0b --- /dev/null +++ b/.cursor/worktrees.json @@ -0,0 +1,5 @@ +{ + "setup-worktree": [ + "make install" + ] +} \ No newline at end of file diff --git a/SUMMARY_PIPELINE_STATE.md b/SUMMARY_PIPELINE_STATE.md new file mode 100644 index 000000000..d42aff0b2 --- /dev/null +++ b/SUMMARY_PIPELINE_STATE.md @@ -0,0 +1,126 @@ +# Graphiti Node Summary Pipelines + +This document outlines how entity summaries are generated today and the pragmatic changes proposed to gate expensive LLM calls using novelty and recency heuristics. + +## Current Execution Flow + +``` ++------------------+ +-------------------------------+ +| Episode Ingest | ----> | retrieve_episodes(last=3) | ++------------------+ +-------------------------------+ + | | + v v ++----------------------+ +---------------------------+ +| extract_nodes | | extract_edges | +| (LLM + reflexion) | | (LLM fact extraction) | ++----------------------+ +---------------------------+ + | | + | dedupe candidates | + v v ++--------------------------+ +---------------------------+ +| resolve_extracted_nodes | | resolve_extracted_edges | +| (deterministic + LLM) | | (LLM dedupe, invalidates)| ++--------------------------+ +---------------------------+ + | | + +----------- parallel ----+ + | + v + +-----------------------------------------+ + | extract_attributes_from_nodes | + | - LLM attribute fill (optional) | + | - LLM summary refresh (always runs) | + +-----------------------------------------+ +``` + +### Current Data & Timing Characteristics + +- **Previous context**: only the latest `EPISODE_WINDOW_LEN = 3` episodes are retrieved before any LLM call. +- **Fact availability**: raw `EntityEdge.fact` strings exist immediately after edge extraction; embeddings are produced inside `resolve_extracted_edges`, concurrently with summary generation. +- **Summary inputs**: the summary prompt only sees `node.summary`, node attributes, episode content, and the three-episode window. It does *not* observe resolved fact invalidations or final edge embeddings. +- **LLM usage**: every node that survives dedupe invokes the summary prompt, even when the episode is low-information or repeat content. + +## Proposed Execution Flow + +``` ++------------------+ +-------------------------------+ +| Episode Ingest | ----> | retrieve_episodes(last=3) | ++------------------+ +-------------------------------+ + | | + v v ++----------------------+ +---------------------------+ +| extract_nodes | | extract_edges | +| (LLM + reflexion) | | (LLM fact extraction) | ++----------------------+ +---------------------------+ + | | + | dedupe candidates | + v v ++--------------------------+ +---------------------------+ +| resolve_extracted_nodes | | build_node_deltas | +| (deterministic + LLM) | | - new facts per node | ++--------------------------+ | - embed facts upfront | + | | - track candidate flips | + | +-------------+-------------+ + | | + | v + | +---------------------------+ + | | resolve_extracted_edges | + | | (uses prebuilt embeddings| + | | updates NodeDelta state)| + | +-------------+-------------+ + | | + | v + | +---------------------------+ + | | summary_gate.should_refresh| + | | - fact hash drift | + | | - embedding drift | + | | - negation / invalidation | + | | - burst & staleness rules | + | +------+------+------------+ + | | | + | | +------------------------------+ + | | | + | v v + | +---------------------------+ +-----------------------------+ + | | skip summary (log cause) | | extract_summary LLM call | + | | update metadata only | | update summary & metadata | + | +---------------------------+ +-----------------------------+ + v ++-------------------------------+ +| add_nodes_and_edges_bulk | ++-------------------------------+ +``` + +### Key Proposed Changes + +1. **NodeDelta staging** + - Generate fact embeddings immediately after edge extraction (`create_entity_edge_embeddings`) and group fact deltas by target node. + - Record potential invalidations and episode timestamps within the delta so the summary gate has full context. + +2. **Deterministic novelty checks** + - Maintain a stable hash of active facts (new facts minus invalidated). Skip summarisation when the hash matches the stored value. + - Compare pooled fact embeddings to the persisted summary embedding; trigger refresh only when cosine drift exceeds a tuned threshold. + - Force refresh whenever the delta indicates polarity changes (contradiction/negation cues). + +3. **Recency & burst handling** + - Track recent episode timestamps per node in metadata. If multiple episodes arrive within a short window, accumulate deltas and defer the summary until the burst ends or a hard cap is reached. + - Enforce a staleness SLA (`last_summary_ts`) so long-lived nodes eventually refresh even if novelty remains low. + +4. **Metadata persistence** + - Persist gate state on each entity (`_graphiti_meta`: `fact_hash`, `summary_embedding`, `last_summary_ts`, `_recent_episode_times`, `_burst_active`). + - Update metadata whether the summary runs or is skipped to keep the gating logic deterministic across ingests. + +5. **Observability** + - Emit counters for `summary_skipped`, `summary_refreshed`, and reasons (unchanged hash, low drift, burst deferral, staleness). Sample a small percentage of skipped cases to validate heuristics. + +## Implementation Snapshot + +| Area | Current | Proposed | +| ---- | ------- | -------- | +| Summary trigger | Always per deduped node | Controlled by `summary_gate` using fact hash, embedding drift, negation, burst, staleness | +| Fact embeddings | Produced during edge resolution (parallel) | Produced immediately after extraction and reused downstream | +| Fact availability in summaries | Not available | Encapsulated in `NodeDelta` passed into summary gate | +| Metadata on node | Summary text + organic attributes | Summary text + `_graphiti_meta` (hash, embedding, timestamps, burst state) | +| Recency handling | None | Deque of recent episode timestamps + burst deferral | +| Negation detection | LLM-only inside edge resolution | Propagated into gate to force summary refresh | + +These adjustments retain the existing inline execution modelโ€”no scheduled jobsโ€”while reducing unnecessary LLM calls and improving determinism by grounding summaries in the same fact set that backs the graph. diff --git a/conductor-setup.sh b/conductor-setup.sh new file mode 100755 index 000000000..c5a3f81ac --- /dev/null +++ b/conductor-setup.sh @@ -0,0 +1,109 @@ +#!/bin/bash +set -e + +echo "๐Ÿš€ Setting up Graphiti workspace..." + +# Check if uv is installed +if ! command -v uv &> /dev/null; then + echo "โŒ Error: 'uv' is not installed. Please install uv first:" + echo " curl -LsSf https://astral.sh/uv/install.sh | sh" + exit 1 +fi + +# Check Python version +python_version=$(python3 --version 2>&1 | awk '{print $2}') +required_version="3.10" +if ! python3 -c "import sys; exit(0 if sys.version_info >= (3, 10) else 1)"; then + echo "โŒ Error: Python 3.10 or higher is required (found: $python_version)" + exit 1 +fi + +echo "โœ“ Prerequisites check passed" + +# Copy necessary files from root repo +echo "๐Ÿ“„ Copying project files from root repo..." +cp "$CONDUCTOR_ROOT_PATH/pyproject.toml" . +cp "$CONDUCTOR_ROOT_PATH/uv.lock" . +cp "$CONDUCTOR_ROOT_PATH/README.md" . +if [ -f "$CONDUCTOR_ROOT_PATH/pytest.ini" ]; then + cp "$CONDUCTOR_ROOT_PATH/pytest.ini" . +fi +if [ -f "$CONDUCTOR_ROOT_PATH/conftest.py" ]; then + cp "$CONDUCTOR_ROOT_PATH/conftest.py" . +fi +if [ -f "$CONDUCTOR_ROOT_PATH/py.typed" ]; then + cp "$CONDUCTOR_ROOT_PATH/py.typed" . +fi + +# Create symlink to source code instead of copying +echo "๐Ÿ”— Creating symlinks to source code..." +ln -sf "$CONDUCTOR_ROOT_PATH/graphiti_core" graphiti_core +ln -sf "$CONDUCTOR_ROOT_PATH/tests" tests +ln -sf "$CONDUCTOR_ROOT_PATH/examples" examples + +# Install dependencies +echo "๐Ÿ“ฆ Installing dependencies with uv..." +uv sync --frozen --extra dev + +# Create workspace-specific Makefile +echo "๐Ÿ“ Creating workspace Makefile..." +cat > Makefile << 'EOF' +.PHONY: install format lint test all check + +# Define variables - using virtualenv directly instead of uv run +PYTHON = .venv/bin/python +PYTEST = .venv/bin/pytest +RUFF = .venv/bin/ruff +PYRIGHT = .venv/bin/pyright + +# Default target +all: format lint test + +# Install dependencies +install: + @echo "Dependencies already installed via conductor-setup.sh" + @echo "Run './conductor-setup.sh' to reinstall" + +# Format code +format: + $(RUFF) check --select I --fix + $(RUFF) format + +# Lint code +lint: + $(RUFF) check + $(PYRIGHT) ./graphiti_core + +# Run tests +test: + DISABLE_FALKORDB=1 DISABLE_KUZU=1 DISABLE_NEPTUNE=1 $(PYTEST) -m "not integration" + +# Run format, lint, and test +check: format lint test +EOF + +# Handle environment variables +if [ -f "$CONDUCTOR_ROOT_PATH/.env" ]; then + echo "๐Ÿ”— Linking .env file from root repo..." + ln -sf "$CONDUCTOR_ROOT_PATH/.env" .env + echo "โœ“ Environment file linked" +else + echo "โš ๏ธ No .env file found in root repo" + echo " Copy $CONDUCTOR_ROOT_PATH/.env.example to $CONDUCTOR_ROOT_PATH/.env" + echo " and add your API keys, then rerun setup" + exit 1 +fi + +# Check for required environment variable +if ! grep -q "OPENAI_API_KEY=.*[^[:space:]]" .env 2>/dev/null; then + echo "โš ๏ธ Warning: OPENAI_API_KEY not set in .env file" + echo " This is required for most Graphiti functionality" +fi + +echo "โœ… Workspace setup complete!" +echo "" +echo "Available commands:" +echo " make test - Run unit tests" +echo " make lint - Lint and type check code" +echo " make format - Format code with ruff" +echo " make check - Run all checks (format, lint, test)" diff --git a/conductor.json b/conductor.json new file mode 100644 index 000000000..095f1816b --- /dev/null +++ b/conductor.json @@ -0,0 +1,7 @@ +{ + "scripts": { + "setup": "./conductor-setup.sh", + "run": "make test" + }, + "runScriptMode": "nonconcurrent" +} diff --git a/graphiti_core/driver/falkordb_driver.py b/graphiti_core/driver/falkordb_driver.py index d0b4ffe8d..ffa3dae73 100644 --- a/graphiti_core/driver/falkordb_driver.py +++ b/graphiti_core/driver/falkordb_driver.py @@ -18,6 +18,7 @@ import datetime import logging from typing import TYPE_CHECKING, Any +import inspect if TYPE_CHECKING: from falkordb import Graph as FalkorGraph @@ -101,12 +102,11 @@ async def run(self, query: str | list, **kwargs: Any) -> Any: if isinstance(query, list): for cypher, params in query: params = convert_datetimes_to_strings(params) - await self.graph.query(str(cypher), params) # type: ignore[reportUnknownArgumentType] + await _await_graph_query(self.graph, str(cypher), params) # type: ignore[reportUnknownArgumentType] else: params = dict(kwargs) params = convert_datetimes_to_strings(params) - await self.graph.query(str(query), params) # type: ignore[reportUnknownArgumentType] - # Assuming `graph.query` is async (ideal); otherwise, wrap in executor + await _await_graph_query(self.graph, str(query), params) # type: ignore[reportUnknownArgumentType] return None @@ -122,6 +122,9 @@ def __init__( password: str | None = None, falkor_db: FalkorDB | None = None, database: str = 'default_db', + *, + lite: bool = False, + lite_db_path: str | None = None, ): """ Initialize the FalkorDB driver. @@ -137,7 +140,23 @@ def __init__( # If a FalkorDB instance is provided, use it directly self.client = falkor_db else: - self.client = FalkorDB(host=host, port=port, username=username, password=password) + if lite: + # Lazy import to avoid mandatory dependency when not using Lite + try: + from redislite.falkordb_client import FalkorDB as LiteFalkorDB # type: ignore + except Exception as e: # broad to surface helpful message + raise ImportError( + 'falkordblite is required for FalkorDB Lite. Install it with: '\ + 'pip install falkordblite' + ) from e + + db_path = lite_db_path or '/tmp/falkordb.db' + lite_client = LiteFalkorDB(db_path) + self.client = _AsyncLiteClientAdapter(lite_client) + self._is_lite = True + else: + self.client = FalkorDB(host=host, port=port, username=username, password=password) + self._is_lite = False self.fulltext_syntax = '@' # FalkorDB uses a redisearch-like syntax for fulltext queries see https://redis.io/docs/latest/develop/ai/search-and-query/query/full-text/ @@ -154,7 +173,7 @@ async def execute_query(self, cypher_query_, **kwargs: Any): params = convert_datetimes_to_strings(dict(kwargs)) try: - result = await graph.query(cypher_query_, params) # type: ignore[reportUnknownArgumentType] + result = await _await_graph_query(graph, cypher_query_, params) # type: ignore[reportUnknownArgumentType] except Exception as e: if 'already indexed' in str(e): # check if index already exists @@ -191,6 +210,9 @@ async def close(self) -> None: await self.client.connection.aclose() elif hasattr(self.client.connection, 'close'): await self.client.connection.close() + else: + # Lite adapter exposes no-op aclose; nothing to do otherwise + pass async def delete_all_indexes(self) -> None: result = await self.execute_query('CALL db.indexes()') @@ -329,3 +351,45 @@ def build_fulltext_query( full_query = group_filter + ' (' + sanitized_query + ')' return full_query + + +# ----------------- +# Internal helpers +# ----------------- + +async def _await_graph_query(graph: Any, cypher: str, params: dict[str, Any] | None): + """ + Await a graph.query call whether it's native-async or sync (Lite). + """ + query_callable = getattr(graph, 'query') + result = query_callable(cypher, params) + if inspect.isawaitable(result): + return await result + # Sync path: run in a thread to avoid blocking the event loop + return await asyncio.to_thread(query_callable, cypher, params) + + +class _AsyncLiteGraphAdapter: + def __init__(self, sync_graph: Any): + self._sync_graph = sync_graph + + async def query(self, cypher: str, params: dict[str, Any] | None = None): + return await asyncio.to_thread(self._sync_graph.query, cypher, params) + + async def ro_query(self, cypher: str, params: dict[str, Any] | None = None): + return await asyncio.to_thread(self._sync_graph.ro_query, cypher, params) + + async def delete(self): + return await asyncio.to_thread(self._sync_graph.delete) + + +class _AsyncLiteClientAdapter: + def __init__(self, sync_db: Any): + self._sync_db = sync_db + + def select_graph(self, name: str): + return _AsyncLiteGraphAdapter(self._sync_db.select_graph(name)) + + async def aclose(self): + # redislite does not expose explicit close; rely on GC. No-op. + return None diff --git a/pyproject.toml b/pyproject.toml index 5281f22b4..51e2b7f7a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ groq = ["groq>=0.2.0"] google-genai = ["google-genai>=1.8.0"] kuzu = ["kuzu>=0.11.3"] falkordb = ["falkordb>=1.1.2,<2.0.0"] +falkordb-lite = ["falkordblite>=0.1.0"] voyageai = ["voyageai>=0.2.3"] neo4j-opensearch = ["boto3>=1.39.16", "opensearch-py>=3.0.0"] sentence-transformers = ["sentence-transformers>=3.2.1"] @@ -42,6 +43,7 @@ dev = [ "anthropic>=0.49.0", "google-genai>=1.8.0", "falkordb>=1.1.2,<2.0.0", + "falkordblite>=0.1.0", "kuzu>=0.11.3", "boto3>=1.39.16", "opensearch-py>=3.0.0", diff --git a/tests/driver/test_falkordb_driver.py b/tests/driver/test_falkordb_driver.py index 7edf9afbb..9a9fc5f0e 100644 --- a/tests/driver/test_falkordb_driver.py +++ b/tests/driver/test_falkordb_driver.py @@ -70,6 +70,30 @@ def test_provider(self): """Test driver provider identification.""" assert self.driver.provider == GraphProvider.FALKORDB + def test_init_with_lite_uses_redislite_adapter(self): + """Test initialization with lite=True uses redislite client adapter. + + We don't require falkordb for this test as we patch the redislite import path. + """ + import sys + import types + + # Create a fake redislite.falkordb_client module with FalkorDB class + fake_redislite = types.ModuleType('redislite') + fake_client_mod = types.ModuleType('redislite.falkordb_client') + fake_client_mod.FalkorDB = MagicMock() + + with patch.dict(sys.modules, { + 'redislite': fake_redislite, + 'redislite.falkordb_client': fake_client_mod, + }): + from graphiti_core.driver.falkordb_driver import FalkorDriver as _FD + + try: + _FD(lite=True, database='default_db') + except Exception as e: + pytest.fail(f'Lite initialization raised unexpectedly: {e}') + @unittest.skipIf(not HAS_FALKORDB, 'FalkorDB is not installed') def test_get_graph_with_name(self): """Test _get_graph with specific graph name."""