Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 129 additions & 111 deletions lightrag/api/lightrag_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,67 +2,60 @@
LightRAG FastAPI Server
"""

from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from fastapi.openapi.docs import (
get_swagger_ui_html,
get_swagger_ui_oauth2_redirect_html,
)
import os
import configparser
import logging
import logging.config
import os
import sys
import uvicorn
import pipmaster as pm
from fastapi.staticfiles import StaticFiles
from fastapi.responses import RedirectResponse
from contextlib import asynccontextmanager
from pathlib import Path
import configparser

import pipmaster as pm
import uvicorn
from ascii_colors import ASCIIColors
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from dotenv import load_dotenv
from fastapi import Depends, FastAPI, HTTPException, Request
from fastapi.exceptions import RequestValidationError
from fastapi.middleware.cors import CORSMiddleware
from fastapi.openapi.docs import (
get_swagger_ui_html,
get_swagger_ui_oauth2_redirect_html,
)
from fastapi.responses import JSONResponse, RedirectResponse
from fastapi.security import OAuth2PasswordRequestForm
from fastapi.staticfiles import StaticFiles

from lightrag import LightRAG
from lightrag import __version__ as core_version
from lightrag.api import __api_version__
from lightrag.api.auth import auth_handler
from lightrag.api.routers.document_routes import DocumentManager, create_document_routes
from lightrag.api.routers.graph_routes import create_graph_routes
from lightrag.api.routers.ollama_api import OllamaAPI
from lightrag.api.routers.query_routes import create_query_routes
from lightrag.api.utils_api import (
get_combined_auth_dependency,
display_splash_screen,
check_env_file,
display_splash_screen,
get_combined_auth_dependency,
)
from .config import (
global_args,
update_uvicorn_mode_config,
get_default_host,
)
from lightrag.utils import get_env_value
from lightrag import LightRAG, __version__ as core_version
from lightrag.api import __api_version__
from lightrag.types import GPTKeywordExtractionFormat
from lightrag.utils import EmbeddingFunc
from lightrag.constants import (
DEFAULT_LOG_MAX_BYTES,
DEFAULT_EMBEDDING_TIMEOUT,
DEFAULT_LLM_TIMEOUT,
DEFAULT_LOG_BACKUP_COUNT,
DEFAULT_LOG_FILENAME,
DEFAULT_LLM_TIMEOUT,
DEFAULT_EMBEDDING_TIMEOUT,
)
from lightrag.api.routers.document_routes import (
DocumentManager,
create_document_routes,
DEFAULT_LOG_MAX_BYTES,
)
from lightrag.api.routers.query_routes import create_query_routes
from lightrag.api.routers.graph_routes import create_graph_routes
from lightrag.api.routers.ollama_api import OllamaAPI

from lightrag.utils import logger, set_verbose_debug
from lightrag.kg.shared_storage import (
get_namespace_data,
get_default_workspace,
# set_default_workspace,
cleanup_keyed_lock,
finalize_share_data,
get_default_workspace,
get_namespace_data,
set_default_workspace,
)
from fastapi.security import OAuth2PasswordRequestForm
from lightrag.api.auth import auth_handler
from lightrag.types import GPTKeywordExtractionFormat
from lightrag.utils import EmbeddingFunc, get_env_value, logger, set_verbose_debug

from .config import get_default_host, global_args, update_uvicorn_mode_config

# use the .env that is inside the current folder
# allows to use different .env file for each lightrag instance
Expand Down Expand Up @@ -343,30 +336,104 @@ def create_app(args):
# Check if API key is provided either through env var or args
api_key = os.getenv("LIGHTRAG_API_KEY") or args.key

# Initialize document manager with workspace support for data isolation
doc_manager = DocumentManager(args.input_dir, workspace=args.workspace)
doc_manager_cache = {}

@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifespan context manager for startup and shutdown events"""
# Store background tasks
app.state.background_tasks = set()
def create_doc_manager(request: Request | None) -> DocumentManager:
"""Create or retrieve DocumentManager for the current workspace"""
workspace = args.workspace
if request is not None:
workspace = get_workspace_from_request(request, args.workspace)

logger.debug(f"Using DocumentManager for workspace: '{workspace}'")
if workspace in doc_manager_cache:
return doc_manager_cache[workspace]

doc_manager = DocumentManager(args.input_dir, workspace=workspace)
doc_manager_cache[workspace] = doc_manager

return doc_manager_cache[workspace]

rag_cache = {}

async def create_rag(request: Request | None) -> LightRAG:
"""Create or retrieve LightRAG instance for the current workspace"""
workspace = args.workspace
if request is not None:
workspace = get_workspace_from_request(request, args.workspace)

logger.debug(f"Using LightRAG instance for workspace: '{workspace}'")
if workspace in rag_cache:
return rag_cache[workspace]

# Create ollama_server_infos from command line arguments
from lightrag.api.config import OllamaServerInfos

ollama_server_infos = OllamaServerInfos(name=args.simulated_model_name, tag=args.simulated_model_tag)

# Initialize RAG with unified configuration
try:
rag = LightRAG(
working_dir=args.working_dir,
workspace=workspace,
llm_model_func=create_llm_model_func(args.llm_binding),
llm_model_name=args.llm_model,
llm_model_max_async=args.max_async,
summary_max_tokens=args.summary_max_tokens,
summary_context_size=args.summary_context_size,
chunk_token_size=int(args.chunk_size),
chunk_overlap_token_size=int(args.chunk_overlap_size),
llm_model_kwargs=create_llm_model_kwargs(args.llm_binding, args, llm_timeout),
embedding_func=embedding_func,
default_llm_timeout=llm_timeout,
default_embedding_timeout=embedding_timeout,
kv_storage=args.kv_storage,
graph_storage=args.graph_storage,
vector_storage=args.vector_storage,
doc_status_storage=args.doc_status_storage,
vector_db_storage_cls_kwargs={"cosine_better_than_threshold": args.cosine_threshold},
enable_llm_cache_for_entity_extract=args.enable_llm_cache_for_extract,
enable_llm_cache=args.enable_llm_cache,
rerank_model_func=rerank_model_func,
max_parallel_insert=args.max_parallel_insert,
max_graph_nodes=args.max_graph_nodes,
addon_params={
"language": args.summary_language,
"entity_types": args.entity_types,
},
ollama_server_infos=ollama_server_infos,
)

# Initialize database connections
# Note: initialize_storages() now auto-initializes pipeline_status for rag.workspace
await rag.initialize_storages()

# Data migration regardless of storage implementation
await rag.check_and_migrate_data()

rag_cache[workspace] = rag
return rag
except Exception as e:
logger.error(f"Failed to initialize LightRAG: {e}")
raise
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Race condition when creating cached LightRAG instances

The create_rag and create_doc_manager functions check and modify rag_cache and doc_manager_cache without any synchronization. When multiple concurrent requests arrive for the same workspace, each can pass the cache check (if workspace in rag_cache) before any has finished initializing and storing the instance. This causes multiple LightRAG instances to be created for the same workspace, with the expensive initialize_storages() and check_and_migrate_data() running in parallel. The last one to finish overwrites others in the cache, potentially leaving orphaned resources. An asyncio.Lock per-cache (or per-workspace keyed lock) is needed to make the check-and-create pattern atomic.

Additional Locations (1)

Fix in Cursor Fix in Web


@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifespan context manager for startup and shutdown events"""
# Store background tasks
app.state.background_tasks = set()

try:
create_doc_manager(None) # Pre-create default DocumentManager
await create_rag(None) # Pre-create default LightRAG

ASCIIColors.green("\nServer is ready to accept connections! 🚀\n")

yield

finally:
# Clean up database connections
await rag.finalize_storages()
for rag in rag_cache.values():
await rag.finalize_storages()

if "LIGHTRAG_GUNICORN_MODE" not in os.environ:
# Only perform cleanup in Uvicorn single-process mode
Expand Down Expand Up @@ -404,6 +471,7 @@ async def lifespan(app: FastAPI):
"tryItOutEnabled": True,
}

set_default_workspace(args.workspace)
app = FastAPI(**app_kwargs)

# Add custom validation error handler for /query/data endpoint
Expand Down Expand Up @@ -456,7 +524,7 @@ def get_cors_origins():
# Create combined auth dependency for all endpoints
combined_auth = get_combined_auth_dependency(api_key)

def get_workspace_from_request(request: Request) -> str | None:
def get_workspace_from_request(request: Request, default: str) -> str:
"""
Extract workspace from HTTP request header or use default.

Expand All @@ -474,7 +542,7 @@ def get_workspace_from_request(request: Request) -> str | None:
workspace = request.headers.get("LIGHTRAG-WORKSPACE", "").strip()

if not workspace:
workspace = None
workspace = default

return workspace

Expand Down Expand Up @@ -1022,66 +1090,19 @@ async def server_rerank_func(
else:
logger.info("Reranking is disabled")

# Create ollama_server_infos from command line arguments
from lightrag.api.config import OllamaServerInfos

ollama_server_infos = OllamaServerInfos(
name=args.simulated_model_name, tag=args.simulated_model_tag
)

# Initialize RAG with unified configuration
try:
rag = LightRAG(
working_dir=args.working_dir,
workspace=args.workspace,
llm_model_func=create_llm_model_func(args.llm_binding),
llm_model_name=args.llm_model,
llm_model_max_async=args.max_async,
summary_max_tokens=args.summary_max_tokens,
summary_context_size=args.summary_context_size,
chunk_token_size=int(args.chunk_size),
chunk_overlap_token_size=int(args.chunk_overlap_size),
llm_model_kwargs=create_llm_model_kwargs(
args.llm_binding, args, llm_timeout
),
embedding_func=embedding_func,
default_llm_timeout=llm_timeout,
default_embedding_timeout=embedding_timeout,
kv_storage=args.kv_storage,
graph_storage=args.graph_storage,
vector_storage=args.vector_storage,
doc_status_storage=args.doc_status_storage,
vector_db_storage_cls_kwargs={
"cosine_better_than_threshold": args.cosine_threshold
},
enable_llm_cache_for_entity_extract=args.enable_llm_cache_for_extract,
enable_llm_cache=args.enable_llm_cache,
rerank_model_func=rerank_model_func,
max_parallel_insert=args.max_parallel_insert,
max_graph_nodes=args.max_graph_nodes,
addon_params={
"language": args.summary_language,
"entity_types": args.entity_types,
},
ollama_server_infos=ollama_server_infos,
)
except Exception as e:
logger.error(f"Failed to initialize LightRAG: {e}")
raise

# Add routes
app.include_router(
create_document_routes(
rag,
doc_manager,
create_rag,
create_doc_manager,
api_key,
)
)
app.include_router(create_query_routes(rag, api_key, args.top_k))
app.include_router(create_graph_routes(rag, api_key))
app.include_router(create_query_routes(create_rag, api_key, args.top_k))
app.include_router(create_graph_routes(create_rag, api_key))

# Add Ollama API routes
ollama_api = OllamaAPI(rag, top_k=args.top_k, api_key=api_key)
ollama_api = OllamaAPI(create_rag, top_k=args.top_k, api_key=api_key)
app.include_router(ollama_api.router, prefix="/api")

# Custom Swagger UI endpoint for offline support
Expand Down Expand Up @@ -1212,10 +1233,7 @@ async def login(form_data: OAuth2PasswordRequestForm = Depends()):
async def get_status(request: Request):
"""Get current system status including WebUI availability"""
try:
workspace = get_workspace_from_request(request)
default_workspace = get_default_workspace()
if workspace is None:
workspace = default_workspace
workspace = get_workspace_from_request(request, get_default_workspace())
pipeline_status = await get_namespace_data(
"pipeline_status", workspace=workspace
)
Expand Down Expand Up @@ -1250,7 +1268,7 @@ async def get_status(request: Request):
"vector_storage": args.vector_storage,
"enable_llm_cache_for_extract": args.enable_llm_cache_for_extract,
"enable_llm_cache": args.enable_llm_cache,
"workspace": default_workspace,
"workspace": workspace,
"max_graph_nodes": args.max_graph_nodes,
# Rerank configuration
"enable_rerank": rerank_model_func is not None,
Expand Down
Loading
Loading