Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .cursor/worktrees.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"setup-worktree": [
"make install"
]
}
126 changes: 126 additions & 0 deletions SUMMARY_PIPELINE_STATE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Graphiti Node Summary Pipelines

This document outlines how entity summaries are generated today and the pragmatic changes proposed to gate expensive LLM calls using novelty and recency heuristics.

## Current Execution Flow

```
+------------------+ +-------------------------------+
| Episode Ingest | ----> | retrieve_episodes(last=3) |
+------------------+ +-------------------------------+
| |
v v
+----------------------+ +---------------------------+
| extract_nodes | | extract_edges |
| (LLM + reflexion) | | (LLM fact extraction) |
+----------------------+ +---------------------------+
| |
| dedupe candidates |
v v
+--------------------------+ +---------------------------+
| resolve_extracted_nodes | | resolve_extracted_edges |
| (deterministic + LLM) | | (LLM dedupe, invalidates)|
+--------------------------+ +---------------------------+
| |
+----------- parallel ----+
|
v
+-----------------------------------------+
| extract_attributes_from_nodes |
| - LLM attribute fill (optional) |
| - LLM summary refresh (always runs) |
+-----------------------------------------+
```

### Current Data & Timing Characteristics

- **Previous context**: only the latest `EPISODE_WINDOW_LEN = 3` episodes are retrieved before any LLM call.
- **Fact availability**: raw `EntityEdge.fact` strings exist immediately after edge extraction; embeddings are produced inside `resolve_extracted_edges`, concurrently with summary generation.
- **Summary inputs**: the summary prompt only sees `node.summary`, node attributes, episode content, and the three-episode window. It does *not* observe resolved fact invalidations or final edge embeddings.
- **LLM usage**: every node that survives dedupe invokes the summary prompt, even when the episode is low-information or repeat content.

## Proposed Execution Flow

```
+------------------+ +-------------------------------+
| Episode Ingest | ----> | retrieve_episodes(last=3) |
+------------------+ +-------------------------------+
| |
v v
+----------------------+ +---------------------------+
| extract_nodes | | extract_edges |
| (LLM + reflexion) | | (LLM fact extraction) |
+----------------------+ +---------------------------+
| |
| dedupe candidates |
v v
+--------------------------+ +---------------------------+
| resolve_extracted_nodes | | build_node_deltas |
| (deterministic + LLM) | | - new facts per node |
+--------------------------+ | - embed facts upfront |
| | - track candidate flips |
| +-------------+-------------+
| |
| v
| +---------------------------+
| | resolve_extracted_edges |
| | (uses prebuilt embeddings|
| | updates NodeDelta state)|
| +-------------+-------------+
| |
| v
| +---------------------------+
| | summary_gate.should_refresh|
| | - fact hash drift |
| | - embedding drift |
| | - negation / invalidation |
| | - burst & staleness rules |
| +------+------+------------+
| | |
| | +------------------------------+
| | |
| v v
| +---------------------------+ +-----------------------------+
| | skip summary (log cause) | | extract_summary LLM call |
| | update metadata only | | update summary & metadata |
| +---------------------------+ +-----------------------------+
v
+-------------------------------+
| add_nodes_and_edges_bulk |
+-------------------------------+
```

### Key Proposed Changes

1. **NodeDelta staging**
- Generate fact embeddings immediately after edge extraction (`create_entity_edge_embeddings`) and group fact deltas by target node.
- Record potential invalidations and episode timestamps within the delta so the summary gate has full context.

2. **Deterministic novelty checks**
- Maintain a stable hash of active facts (new facts minus invalidated). Skip summarisation when the hash matches the stored value.
- Compare pooled fact embeddings to the persisted summary embedding; trigger refresh only when cosine drift exceeds a tuned threshold.
- Force refresh whenever the delta indicates polarity changes (contradiction/negation cues).

3. **Recency & burst handling**
- Track recent episode timestamps per node in metadata. If multiple episodes arrive within a short window, accumulate deltas and defer the summary until the burst ends or a hard cap is reached.
- Enforce a staleness SLA (`last_summary_ts`) so long-lived nodes eventually refresh even if novelty remains low.

4. **Metadata persistence**
- Persist gate state on each entity (`_graphiti_meta`: `fact_hash`, `summary_embedding`, `last_summary_ts`, `_recent_episode_times`, `_burst_active`).
- Update metadata whether the summary runs or is skipped to keep the gating logic deterministic across ingests.

5. **Observability**
- Emit counters for `summary_skipped`, `summary_refreshed`, and reasons (unchanged hash, low drift, burst deferral, staleness). Sample a small percentage of skipped cases to validate heuristics.

## Implementation Snapshot

| Area | Current | Proposed |
| ---- | ------- | -------- |
| Summary trigger | Always per deduped node | Controlled by `summary_gate` using fact hash, embedding drift, negation, burst, staleness |
| Fact embeddings | Produced during edge resolution (parallel) | Produced immediately after extraction and reused downstream |
| Fact availability in summaries | Not available | Encapsulated in `NodeDelta` passed into summary gate |
| Metadata on node | Summary text + organic attributes | Summary text + `_graphiti_meta` (hash, embedding, timestamps, burst state) |
| Recency handling | None | Deque of recent episode timestamps + burst deferral |
| Negation detection | LLM-only inside edge resolution | Propagated into gate to force summary refresh |

These adjustments retain the existing inline execution model—no scheduled jobs—while reducing unnecessary LLM calls and improving determinism by grounding summaries in the same fact set that backs the graph.
109 changes: 109 additions & 0 deletions conductor-setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#!/bin/bash
set -e

echo "🚀 Setting up Graphiti workspace..."

# Check if uv is installed
if ! command -v uv &> /dev/null; then
echo "❌ Error: 'uv' is not installed. Please install uv first:"
echo " curl -LsSf https://astral.sh/uv/install.sh | sh"
exit 1
fi

# Check Python version
python_version=$(python3 --version 2>&1 | awk '{print $2}')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The required_version variable is set but never used. Either remove it or use it in a more informative error message like:

echo "❌ Error: Python ${required_version}+ is required (found: $python_version)"

required_version="3.10"
if ! python3 -c "import sys; exit(0 if sys.version_info >= (3, 10) else 1)"; then
echo "❌ Error: Python 3.10 or higher is required (found: $python_version)"
exit 1
fi

echo "✓ Prerequisites check passed"

# Copy necessary files from root repo
echo "📄 Copying project files from root repo..."
cp "$CONDUCTOR_ROOT_PATH/pyproject.toml" .
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing validation that $CONDUCTOR_ROOT_PATH is set. The script will fail with confusing errors if this environment variable is not defined. Add at the start of the script:

if [ -z "$CONDUCTOR_ROOT_PATH" ]; then
    echo "❌ Error: CONDUCTOR_ROOT_PATH environment variable is not set"
    exit 1
fi

cp "$CONDUCTOR_ROOT_PATH/uv.lock" .
cp "$CONDUCTOR_ROOT_PATH/README.md" .
if [ -f "$CONDUCTOR_ROOT_PATH/pytest.ini" ]; then
cp "$CONDUCTOR_ROOT_PATH/pytest.ini" .
fi
if [ -f "$CONDUCTOR_ROOT_PATH/conftest.py" ]; then
cp "$CONDUCTOR_ROOT_PATH/conftest.py" .
fi
if [ -f "$CONDUCTOR_ROOT_PATH/py.typed" ]; then
cp "$CONDUCTOR_ROOT_PATH/py.typed" .
fi

# Create symlink to source code instead of copying
echo "🔗 Creating symlinks to source code..."
ln -sf "$CONDUCTOR_ROOT_PATH/graphiti_core" graphiti_core
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using ln -sf will silently overwrite existing symlinks. If a workspace is re-setup, this could cause issues if the symlinks already exist. Consider checking first or adding validation that the symlinks point to the expected location.

ln -sf "$CONDUCTOR_ROOT_PATH/tests" tests
ln -sf "$CONDUCTOR_ROOT_PATH/examples" examples

# Install dependencies
echo "📦 Installing dependencies with uv..."
uv sync --frozen --extra dev

# Create workspace-specific Makefile
echo "📝 Creating workspace Makefile..."
cat > Makefile << 'EOF'
.PHONY: install format lint test all check
# Define variables - using virtualenv directly instead of uv run
PYTHON = .venv/bin/python
PYTEST = .venv/bin/pytest
RUFF = .venv/bin/ruff
PYRIGHT = .venv/bin/pyright
# Default target
all: format lint test
# Install dependencies
install:
@echo "Dependencies already installed via conductor-setup.sh"
@echo "Run './conductor-setup.sh' to reinstall"
# Format code
format:
$(RUFF) check --select I --fix
$(RUFF) format
# Lint code
lint:
$(RUFF) check
$(PYRIGHT) ./graphiti_core
# Run tests
test:
DISABLE_FALKORDB=1 DISABLE_KUZU=1 DISABLE_NEPTUNE=1 $(PYTEST) -m "not integration"
# Run format, lint, and test
check: format lint test
EOF

# Handle environment variables
if [ -f "$CONDUCTOR_ROOT_PATH/.env" ]; then
echo "🔗 Linking .env file from root repo..."
ln -sf "$CONDUCTOR_ROOT_PATH/.env" .env
echo "✓ Environment file linked"
else
echo "⚠️ No .env file found in root repo"
echo " Copy $CONDUCTOR_ROOT_PATH/.env.example to $CONDUCTOR_ROOT_PATH/.env"
echo " and add your API keys, then rerun setup"
exit 1
fi

# Check for required environment variable
if ! grep -q "OPENAI_API_KEY=.*[^[:space:]]" .env 2>/dev/null; then
echo "⚠️ Warning: OPENAI_API_KEY not set in .env file"
echo " This is required for most Graphiti functionality"
fi

echo "✅ Workspace setup complete!"
echo ""
echo "Available commands:"
echo " make test - Run unit tests"
echo " make lint - Lint and type check code"
echo " make format - Format code with ruff"
echo " make check - Run all checks (format, lint, test)"
7 changes: 7 additions & 0 deletions conductor.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"scripts": {
"setup": "./conductor-setup.sh",
"run": "make test"
},
"runScriptMode": "nonconcurrent"
}
74 changes: 69 additions & 5 deletions graphiti_core/driver/falkordb_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
limitations under the License.
"""

import asyncio
import datetime
import logging
from typing import TYPE_CHECKING, Any
import inspect

Check failure on line 21 in graphiti_core/driver/falkordb_driver.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (I001)

graphiti_core/driver/falkordb_driver.py:17:1: I001 Import block is un-sorted or un-formatted

if TYPE_CHECKING:
from falkordb import Graph as FalkorGraph
Expand Down Expand Up @@ -101,12 +102,11 @@
if isinstance(query, list):
for cypher, params in query:
params = convert_datetimes_to_strings(params)
await self.graph.query(str(cypher), params) # type: ignore[reportUnknownArgumentType]
await _await_graph_query(self.graph, str(cypher), params) # type: ignore[reportUnknownArgumentType]
else:
params = dict(kwargs)
params = convert_datetimes_to_strings(params)
await self.graph.query(str(query), params) # type: ignore[reportUnknownArgumentType]
# Assuming `graph.query` is async (ideal); otherwise, wrap in executor
await _await_graph_query(self.graph, str(query), params) # type: ignore[reportUnknownArgumentType]
return None


Expand All @@ -122,6 +122,9 @@
password: str | None = None,
falkor_db: FalkorDB | None = None,
database: str = 'default_db',
*,
lite: bool = False,
lite_db_path: str | None = None,
):
"""
Initialize the FalkorDB driver.
Expand All @@ -137,7 +140,23 @@
# If a FalkorDB instance is provided, use it directly
self.client = falkor_db
else:
self.client = FalkorDB(host=host, port=port, username=username, password=password)
if lite:
# Lazy import to avoid mandatory dependency when not using Lite
try:
from redislite.falkordb_client import FalkorDB as LiteFalkorDB # type: ignore
except Exception as e: # broad to surface helpful message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catching bare Exception is too broad. This will catch KeyboardInterrupt, SystemExit, and other exceptions that should propagate. Use ImportError specifically since that's what you're trying to catch.

raise ImportError(
'falkordblite is required for FalkorDB Lite. Install it with: '\
'pip install falkordblite'
) from e

db_path = lite_db_path or '/tmp/falkordb.db'
lite_client = LiteFalkorDB(db_path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardcoding /tmp/falkordb.db as the default is problematic:

  • /tmp may be cleared on reboot, losing data
  • No write permission guarantees in /tmp on all systems
  • No isolation between different processes/users

Consider using a more appropriate default like ~/.local/share/graphiti/falkordb.db or require the user to specify the path explicitly.

self.client = _AsyncLiteClientAdapter(lite_client)
self._is_lite = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _is_lite instance variable is set but never used anywhere in the class. Either remove it or document why it's needed for future functionality.

else:
self.client = FalkorDB(host=host, port=port, username=username, password=password)
self._is_lite = False

self.fulltext_syntax = '@' # FalkorDB uses a redisearch-like syntax for fulltext queries see https://redis.io/docs/latest/develop/ai/search-and-query/query/full-text/

Expand All @@ -154,7 +173,7 @@
params = convert_datetimes_to_strings(dict(kwargs))

try:
result = await graph.query(cypher_query_, params) # type: ignore[reportUnknownArgumentType]
result = await _await_graph_query(graph, cypher_query_, params) # type: ignore[reportUnknownArgumentType]
except Exception as e:
if 'already indexed' in str(e):
# check if index already exists
Expand Down Expand Up @@ -191,6 +210,9 @@
await self.client.connection.aclose()
elif hasattr(self.client.connection, 'close'):
await self.client.connection.close()
else:
# Lite adapter exposes no-op aclose; nothing to do otherwise
pass

async def delete_all_indexes(self) -> None:
result = await self.execute_query('CALL db.indexes()')
Expand Down Expand Up @@ -329,3 +351,45 @@
full_query = group_filter + ' (' + sanitized_query + ')'

return full_query


# -----------------
# Internal helpers
# -----------------

async def _await_graph_query(graph: Any, cypher: str, params: dict[str, Any] | None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function calls query_callable twice when the result is not awaitable - once on line 365 to check if it's awaitable, and again on line 369 to actually run it. This executes the query twice for sync clients.

Fix:

async def _await_graph_query(graph: Any, cypher: str, params: dict[str, Any] | None):
    query_callable = getattr(graph, 'query')
    result = query_callable(cypher, params)
    if inspect.isawaitable(result):
        return await result
    # Result already executed for sync path, just return it
    return result

"""
Await a graph.query call whether it's native-async or sync (Lite).
"""
query_callable = getattr(graph, 'query')

Check failure on line 364 in graphiti_core/driver/falkordb_driver.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (B009)

graphiti_core/driver/falkordb_driver.py:364:22: B009 Do not call `getattr` with a constant attribute value. It is not any safer than normal property access.
result = query_callable(cypher, params)
if inspect.isawaitable(result):
return await result
# Sync path: run in a thread to avoid blocking the event loop
return await asyncio.to_thread(query_callable, cypher, params)


class _AsyncLiteGraphAdapter:
def __init__(self, sync_graph: Any):
self._sync_graph = sync_graph

async def query(self, cypher: str, params: dict[str, Any] | None = None):
return await asyncio.to_thread(self._sync_graph.query, cypher, params)

async def ro_query(self, cypher: str, params: dict[str, Any] | None = None):
return await asyncio.to_thread(self._sync_graph.ro_query, cypher, params)

async def delete(self):
return await asyncio.to_thread(self._sync_graph.delete)


class _AsyncLiteClientAdapter:
def __init__(self, sync_db: Any):
self._sync_db = sync_db

def select_graph(self, name: str):
return _AsyncLiteGraphAdapter(self._sync_db.select_graph(name))

async def aclose(self):
# redislite does not expose explicit close; rely on GC. No-op.
return None
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ groq = ["groq>=0.2.0"]
google-genai = ["google-genai>=1.8.0"]
kuzu = ["kuzu>=0.11.3"]
falkordb = ["falkordb>=1.1.2,<2.0.0"]
falkordb-lite = ["falkordblite>=0.1.0"]
voyageai = ["voyageai>=0.2.3"]
neo4j-opensearch = ["boto3>=1.39.16", "opensearch-py>=3.0.0"]
sentence-transformers = ["sentence-transformers>=3.2.1"]
Expand All @@ -42,6 +43,7 @@ dev = [
"anthropic>=0.49.0",
"google-genai>=1.8.0",
"falkordb>=1.1.2,<2.0.0",
"falkordblite>=0.1.0",
"kuzu>=0.11.3",
"boto3>=1.39.16",
"opensearch-py>=3.0.0",
Expand Down
Loading
Loading