diff --git a/CLAUDE.md b/CLAUDE.md index 25961b3..c1208ee 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -596,6 +596,16 @@ curl http://localhost:7474 - Verify managed identity has appropriate permissions - Set USE_AZURE_AD=true in .env +**Neo4j 5.x index creation warnings:** +``` +Warning: Index creation race condition detected (Neo4j 5.x issue). Indexes likely already exist. Continuing... +``` +- This is a known Neo4j 5.x issue with concurrent `CREATE INDEX ... IF NOT EXISTS` statements +- The warning is harmless - the server continues to function normally +- Indexes are created successfully on the first attempt; subsequent parallel attempts may trigger the warning +- This does not affect functionality or performance +- Related: [Neo4j Issue #13208](https://github.com/neo4j/neo4j/issues/13208), [Graphiti PR #1081](https://github.com/getzep/graphiti/pull/1081) + ## Operational Utilities ### Backup and Restore diff --git a/src/graphiti_mcp_server.py b/src/graphiti_mcp_server.py index 4bcedd2..d8d862c 100644 --- a/src/graphiti_mcp_server.py +++ b/src/graphiti_mcp_server.py @@ -11,14 +11,14 @@ from pathlib import Path from typing import Any, Optional +import fastmcp from dotenv import load_dotenv +from fastmcp import FastMCP from graphiti_core import Graphiti from graphiti_core.edges import EntityEdge from graphiti_core.nodes import EpisodeType, EpisodicNode from graphiti_core.search.search_filters import SearchFilters from graphiti_core.utils.maintenance.graph_data_operations import clear_data -import fastmcp -from fastmcp import FastMCP from pydantic import BaseModel from starlette.responses import JSONResponse @@ -280,8 +280,18 @@ async def initialize(self) -> None: # Re-raise other errors raise - # Build indices - await self.client.build_indices_and_constraints() + # Build indices - wrap in try/except to handle Neo4j 5.x race condition + # with parallel IF NOT EXISTS index creation + try: + await self.client.build_indices_and_constraints() + except Exception as idx_error: + if 'EquivalentSchemaRuleAlreadyExists' in str(idx_error): + logger.warning( + 'Index creation race condition detected (Neo4j 5.x issue). ' + 'Indexes likely already exist. Continuing...' + ) + else: + raise logger.info('Successfully initialized Graphiti client') @@ -920,7 +930,9 @@ async def run_mcp_server(): logger.info( f'Running MCP server with SSE transport on {fastmcp.settings.host}:{fastmcp.settings.port}' ) - logger.info(f'Access the server at: http://{fastmcp.settings.host}:{fastmcp.settings.port}/sse') + logger.info( + f'Access the server at: http://{fastmcp.settings.host}:{fastmcp.settings.port}/sse' + ) await mcp.run_sse_async() elif mcp_config.transport == 'http': # Use localhost for display if binding to 0.0.0.0 @@ -944,7 +956,7 @@ async def run_mcp_server(): # Configure uvicorn logging to match our format configure_uvicorn_logging() - await mcp.run_http_async(transport="http") + await mcp.run_http_async(transport='http') else: raise ValueError( f'Unsupported transport: {mcp_config.transport}. Use "sse", "stdio", or "http"' diff --git a/src/server.py b/src/server.py index e192477..d305515 100644 --- a/src/server.py +++ b/src/server.py @@ -154,7 +154,19 @@ async def initialize(self) -> None: max_coroutines=self.semaphore_limit, ) - await self.client.build_indices_and_constraints() + # Build indices - wrap in try/except to handle Neo4j 5.x race condition + # with parallel IF NOT EXISTS index creation + try: + await self.client.build_indices_and_constraints() + except Exception as idx_error: + if 'EquivalentSchemaRuleAlreadyExists' in str(idx_error): + logger.warning( + 'Index creation race condition detected (Neo4j 5.x issue). ' + 'Indexes likely already exist. Continuing...' + ) + else: + raise + logger.info('Successfully initialized Graphiti client') except Exception as e: diff --git a/tests/test_neo4j_race_condition.py b/tests/test_neo4j_race_condition.py new file mode 100644 index 0000000..023e94e --- /dev/null +++ b/tests/test_neo4j_race_condition.py @@ -0,0 +1,118 @@ +"""Tests for Neo4j 5.x race condition handling during index creation.""" + +import logging +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + + +@pytest.mark.integration +@pytest.mark.requires_neo4j +async def test_neo4j_index_race_condition_handled(): + """Test that EquivalentSchemaRuleAlreadyExists is gracefully handled.""" + from config.schema import ServerConfig + from src.server import GraphitiService + + # Create a properly configured mock config + config = MagicMock(spec=ServerConfig) + config.database = MagicMock() + config.database.provider = 'neo4j' + config.llm = MagicMock() + config.llm.provider = 'openai' + config.embedder = MagicMock() + config.embedder.provider = 'openai' + config.graphiti = MagicMock() + config.graphiti.group_id = 'test_group' + + service = GraphitiService(config) + + # Mock the Graphiti client to raise the race condition error + mock_client = AsyncMock() + mock_error = Exception('Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists') + mock_client.build_indices_and_constraints.side_effect = mock_error + + with ( + patch('src.server.Graphiti', return_value=mock_client), + patch('services.factories.LLMClientFactory.create', return_value=AsyncMock()), + patch('services.factories.EmbedderFactory.create', return_value=AsyncMock()), + ): + # Should complete without raising + await service.initialize() + + # Verify client was created + assert service.client is not None + + +@pytest.mark.integration +@pytest.mark.requires_neo4j +async def test_neo4j_other_index_errors_are_raised(): + """Test that non-race-condition errors are properly raised.""" + from config.schema import ServerConfig + from src.server import GraphitiService + + # Create a properly configured mock config + config = MagicMock(spec=ServerConfig) + config.database = MagicMock() + config.database.provider = 'neo4j' + config.llm = MagicMock() + config.llm.provider = 'openai' + config.embedder = MagicMock() + config.embedder.provider = 'openai' + config.graphiti = MagicMock() + config.graphiti.group_id = 'test_group' + + service = GraphitiService(config) + + # Mock the Graphiti client to raise a different error + mock_client = AsyncMock() + mock_error = Exception('Some other database error') + mock_client.build_indices_and_constraints.side_effect = mock_error + + # Should raise the original error + with ( + patch('src.server.Graphiti', return_value=mock_client), + patch('services.factories.LLMClientFactory.create', return_value=AsyncMock()), + patch('services.factories.EmbedderFactory.create', return_value=AsyncMock()), + pytest.raises(Exception, match='Some other database error'), + ): + await service.initialize() + + +@pytest.mark.integration +@pytest.mark.requires_neo4j +async def test_neo4j_race_condition_logs_warning(caplog): + """Test that race condition triggers a warning log.""" + from config.schema import ServerConfig + from src.server import GraphitiService + + # Create a properly configured mock config + config = MagicMock(spec=ServerConfig) + config.database = MagicMock() + config.database.provider = 'neo4j' + config.llm = MagicMock() + config.llm.provider = 'openai' + config.embedder = MagicMock() + config.embedder.provider = 'openai' + config.graphiti = MagicMock() + config.graphiti.group_id = 'test_group' + + service = GraphitiService(config) + + # Mock the Graphiti client to raise the race condition error + mock_client = AsyncMock() + mock_error = Exception('Neo.ClientError.Schema.EquivalentSchemaRuleAlreadyExists') + mock_client.build_indices_and_constraints.side_effect = mock_error + + with ( + caplog.at_level(logging.WARNING), + patch('src.server.Graphiti', return_value=mock_client), + patch('services.factories.LLMClientFactory.create', return_value=AsyncMock()), + patch('services.factories.EmbedderFactory.create', return_value=AsyncMock()), + ): + await service.initialize() + + # Verify warning was logged + assert any( + 'Index creation race condition detected' in record.message + for record in caplog.records + )