Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,16 @@ curl http://localhost:7474
- Verify managed identity has appropriate permissions
- Set USE_AZURE_AD=true in .env

**Neo4j 5.x index creation warnings:**
```
Warning: Index creation race condition detected (Neo4j 5.x issue). Indexes likely already exist. Continuing...
```
- This is a known Neo4j 5.x issue with concurrent `CREATE INDEX ... IF NOT EXISTS` statements
- The warning is harmless - the server continues to function normally
- Indexes are created successfully on the first attempt; subsequent parallel attempts may trigger the warning
- This does not affect functionality or performance
- Related: [Neo4j Issue #13208](https://github.com/neo4j/neo4j/issues/13208), [Graphiti PR #1081](https://github.com/getzep/graphiti/pull/1081)

## Operational Utilities

### Backup and Restore
Expand Down
24 changes: 18 additions & 6 deletions src/graphiti_mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
from pathlib import Path
from typing import Any, Optional

import fastmcp
from dotenv import load_dotenv
from fastmcp import FastMCP
from graphiti_core import Graphiti
from graphiti_core.edges import EntityEdge
from graphiti_core.nodes import EpisodeType, EpisodicNode
from graphiti_core.search.search_filters import SearchFilters
from graphiti_core.utils.maintenance.graph_data_operations import clear_data
import fastmcp
from fastmcp import FastMCP
from pydantic import BaseModel
from starlette.responses import JSONResponse

Expand Down Expand Up @@ -280,8 +280,18 @@ async def initialize(self) -> None:
# Re-raise other errors
raise

# Build indices
await self.client.build_indices_and_constraints()
# Build indices - wrap in try/except to handle Neo4j 5.x race condition
# with parallel IF NOT EXISTS index creation
try:
await self.client.build_indices_and_constraints()
except Exception as idx_error:
if 'EquivalentSchemaRuleAlreadyExists' in str(idx_error):
logger.warning(
'Index creation race condition detected (Neo4j 5.x issue). '
'Indexes likely already exist. Continuing...'
)
else:
raise

logger.info('Successfully initialized Graphiti client')

Expand Down Expand Up @@ -920,7 +930,9 @@ async def run_mcp_server():
logger.info(
f'Running MCP server with SSE transport on {fastmcp.settings.host}:{fastmcp.settings.port}'
)
logger.info(f'Access the server at: http://{fastmcp.settings.host}:{fastmcp.settings.port}/sse')
logger.info(
f'Access the server at: http://{fastmcp.settings.host}:{fastmcp.settings.port}/sse'
)
await mcp.run_sse_async()
elif mcp_config.transport == 'http':
# Use localhost for display if binding to 0.0.0.0
Expand All @@ -944,7 +956,7 @@ async def run_mcp_server():
# Configure uvicorn logging to match our format
configure_uvicorn_logging()

await mcp.run_http_async(transport="http")
await mcp.run_http_async(transport='http')
else:
raise ValueError(
f'Unsupported transport: {mcp_config.transport}. Use "sse", "stdio", or "http"'
Expand Down
14 changes: 13 additions & 1 deletion src/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,19 @@ async def initialize(self) -> None:
max_coroutines=self.semaphore_limit,
)

await self.client.build_indices_and_constraints()
# Build indices - wrap in try/except to handle Neo4j 5.x race condition
# with parallel IF NOT EXISTS index creation
try:
await self.client.build_indices_and_constraints()
except Exception as idx_error:
if 'EquivalentSchemaRuleAlreadyExists' in str(idx_error):
logger.warning(
'Index creation race condition detected (Neo4j 5.x issue). '
'Indexes likely already exist. Continuing...'
)
else:
raise

logger.info('Successfully initialized Graphiti client')

except Exception as e:
Expand Down
118 changes: 118 additions & 0 deletions tests/test_neo4j_race_condition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""Tests for Neo4j 5.x race condition handling during index creation."""

import logging
from unittest.mock import AsyncMock, MagicMock, patch

import pytest


@pytest.mark.integration
@pytest.mark.requires_neo4j
async def test_neo4j_index_race_condition_handled():
"""Test that EquivalentSchemaRuleAlreadyExists is gracefully handled."""
from config.schema import ServerConfig
from src.server import GraphitiService

# Create a properly configured mock config
config = MagicMock(spec=ServerConfig)
config.database = MagicMock()
config.database.provider = 'neo4j'
config.llm = MagicMock()
config.llm.provider = 'openai'
config.embedder = MagicMock()
config.embedder.provider = 'openai'
config.graphiti = MagicMock()
config.graphiti.group_id = 'test_group'

service = GraphitiService(config)

# Mock the Graphiti client to raise the race condition error
mock_client = AsyncMock()
mock_error = Exception('Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists')
mock_client.build_indices_and_constraints.side_effect = mock_error

with (
patch('src.server.Graphiti', return_value=mock_client),
patch('services.factories.LLMClientFactory.create', return_value=AsyncMock()),
patch('services.factories.EmbedderFactory.create', return_value=AsyncMock()),
):
# Should complete without raising
await service.initialize()

# Verify client was created
assert service.client is not None


@pytest.mark.integration
@pytest.mark.requires_neo4j
async def test_neo4j_other_index_errors_are_raised():
"""Test that non-race-condition errors are properly raised."""
from config.schema import ServerConfig
from src.server import GraphitiService

# Create a properly configured mock config
config = MagicMock(spec=ServerConfig)
config.database = MagicMock()
config.database.provider = 'neo4j'
config.llm = MagicMock()
config.llm.provider = 'openai'
config.embedder = MagicMock()
config.embedder.provider = 'openai'
config.graphiti = MagicMock()
config.graphiti.group_id = 'test_group'

service = GraphitiService(config)

# Mock the Graphiti client to raise a different error
mock_client = AsyncMock()
mock_error = Exception('Some other database error')
mock_client.build_indices_and_constraints.side_effect = mock_error

# Should raise the original error
with (
patch('src.server.Graphiti', return_value=mock_client),
patch('services.factories.LLMClientFactory.create', return_value=AsyncMock()),
patch('services.factories.EmbedderFactory.create', return_value=AsyncMock()),
pytest.raises(Exception, match='Some other database error'),
):
await service.initialize()


@pytest.mark.integration
@pytest.mark.requires_neo4j
async def test_neo4j_race_condition_logs_warning(caplog):
"""Test that race condition triggers a warning log."""
from config.schema import ServerConfig
from src.server import GraphitiService

# Create a properly configured mock config
config = MagicMock(spec=ServerConfig)
config.database = MagicMock()
config.database.provider = 'neo4j'
config.llm = MagicMock()
config.llm.provider = 'openai'
config.embedder = MagicMock()
config.embedder.provider = 'openai'
config.graphiti = MagicMock()
config.graphiti.group_id = 'test_group'

service = GraphitiService(config)

# Mock the Graphiti client to raise the race condition error
mock_client = AsyncMock()
mock_error = Exception('Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists')
mock_client.build_indices_and_constraints.side_effect = mock_error

with (
caplog.at_level(logging.WARNING),
patch('src.server.Graphiti', return_value=mock_client),
patch('services.factories.LLMClientFactory.create', return_value=AsyncMock()),
patch('services.factories.EmbedderFactory.create', return_value=AsyncMock()),
):
await service.initialize()

# Verify warning was logged
assert any(
'Index creation race condition detected' in record.message
for record in caplog.records
)