Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 99 additions & 30 deletions lightrag/api/routers/document_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,72 @@ def get_unique_filename_in_enqueued(target_dir: Path, original_name: str) -> str
return f"{base_name}_{timestamp}{extension}"


async def _extract_pdf_with_docling(file_path: Path) -> str:
"""Extract text from PDF using Docling engine."""
if not pm.is_installed("docling"): # type: ignore
pm.install("docling")
from docling.document_converter import DocumentConverter # type: ignore

converter = DocumentConverter()
result = converter.convert(file_path)
return result.document.export_to_markdown()


async def _extract_pdf_with_pypdf2(file_bytes: bytes) -> tuple[str, list[dict]]:
"""Extract text and page data from PDF using PyPDF2."""
if not pm.is_installed("pypdf2"): # type: ignore
pm.install("pypdf2")
from PyPDF2 import PdfReader # type: ignore
from io import BytesIO

pdf_file = BytesIO(file_bytes)
reader = PdfReader(pdf_file)
content = ""
page_data = []
char_position = 0

for page_num, page in enumerate(reader.pages, start=1):
page_text = page.extract_text() + "\n"
page_start = char_position
page_end = char_position + len(page_text)

page_data.append(
{
"page_number": page_num,
"content": page_text,
"char_start": page_start,
"char_end": page_end,
}
)

content += page_text
char_position = page_end

return content, page_data


async def _handle_file_processing_error(
rag: LightRAG,
filename: str,
error_type: str,
error_msg: str,
file_size: int,
track_id: str,
) -> None:
"""Handle file processing errors consistently."""
error_files = [
{
"file_path": filename,
"error_description": f"[File Extraction]{error_type}",
"original_error": error_msg,
"file_size": file_size,
}
]

await rag.apipeline_enqueue_error_documents(error_files, track_id)
logger.error(f"[File Extraction]{error_type} for {filename}: {error_msg}")


async def pipeline_enqueue_file(
rag: LightRAG, file_path: Path, track_id: str = None
) -> tuple[bool, str]:
Expand All @@ -878,6 +944,7 @@ async def pipeline_enqueue_file(

try:
content = ""
page_data = None # Initialize page data (will be populated for PDFs)
ext = file_path.suffix.lower()
file_size = 0

Expand Down Expand Up @@ -1029,38 +1096,20 @@ async def pipeline_enqueue_file(

case ".pdf":
try:
page_data = []
if global_args.document_loading_engine == "DOCLING":
if not pm.is_installed("docling"): # type: ignore
pm.install("docling")
from docling.document_converter import DocumentConverter # type: ignore

converter = DocumentConverter()
result = converter.convert(file_path)
content = result.document.export_to_markdown()
content = await _extract_pdf_with_docling(file_path)
# TODO: Extract page-level data from Docling
else:
if not pm.is_installed("pypdf2"): # type: ignore
pm.install("pypdf2")
from PyPDF2 import PdfReader # type: ignore
from io import BytesIO

pdf_file = BytesIO(file)
reader = PdfReader(pdf_file)
for page in reader.pages:
content += page.extract_text() + "\n"
content, page_data = await _extract_pdf_with_pypdf2(file)
except Exception as e:
error_files = [
{
"file_path": str(file_path.name),
"error_description": "[File Extraction]PDF processing error",
"original_error": f"Failed to extract text from PDF: {str(e)}",
"file_size": file_size,
}
]
await rag.apipeline_enqueue_error_documents(
error_files, track_id
)
logger.error(
f"[File Extraction]Error processing PDF {file_path.name}: {str(e)}"
await _handle_file_processing_error(
rag,
file_path.name,
"PDF processing error",
str(e),
file_size,
track_id,
)
return False, track_id

Expand Down Expand Up @@ -1239,8 +1288,28 @@ async def pipeline_enqueue_file(
return False, track_id

try:
# Pass page_data if it was collected (only for PDFs with PyPDF2)
page_data_to_pass = (
[page_data]
if page_data is not None and len(page_data) > 0
else None
)

# Debug logging
if page_data_to_pass:
logger.info(
f"Passing page metadata for {file_path.name}: {len(page_data_to_pass[0])} pages"
)
else:
logger.debug(
f"No page metadata for {file_path.name} (non-PDF or extraction failed)"
)

await rag.apipeline_enqueue_documents(
content, file_paths=file_path.name, track_id=track_id
content,
file_paths=file_path.name,
track_id=track_id,
page_data_list=page_data_to_pass,
)

logger.info(
Expand Down
4 changes: 4 additions & 0 deletions lightrag/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ class TextChunkSchema(TypedDict):
content: str
full_doc_id: str
chunk_order_index: int
# Page tracking fields (optional for backward compatibility)
start_page: int | None # 1-based page number where chunk starts
end_page: int | None # 1-based page number where chunk ends (inclusive)
pages: list[int] | None # List of all pages this chunk spans [start, end]


T = TypeVar("T")
Expand Down
46 changes: 39 additions & 7 deletions lightrag/kg/postgres_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1782,6 +1782,11 @@ async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
"content": v["content"],
"file_path": v["file_path"],
"llm_cache_list": json.dumps(v.get("llm_cache_list", [])),
"start_page": v.get("start_page"), # Optional page fields
"end_page": v.get("end_page"),
"pages": json.dumps(v.get("pages"))
if v.get("pages") is not None
else None,
"create_time": current_time,
"update_time": current_time,
}
Expand All @@ -1794,6 +1799,9 @@ async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
"content": v["content"],
"doc_name": v.get("file_path", ""), # Map file_path to doc_name
"workspace": self.workspace,
"page_data": json.dumps(v.get("page_data"))
if v.get("page_data") is not None
else None,
}
await self.db.execute(upsert_sql, _data)
elif is_namespace(self.namespace, NameSpace.KV_STORE_LLM_RESPONSE_CACHE):
Expand Down Expand Up @@ -1949,6 +1957,11 @@ def _upsert_chunks(
"content": item["content"],
"content_vector": json.dumps(item["__vector__"].tolist()),
"file_path": item["file_path"],
"start_page": item.get("start_page"), # Optional page fields
"end_page": item.get("end_page"),
"pages": json.dumps(item.get("pages"))
if item.get("pages") is not None
else None,
"create_time": current_time,
"update_time": current_time,
}
Expand Down Expand Up @@ -4508,6 +4521,7 @@ def namespace_to_table_name(namespace: str) -> str:
doc_name VARCHAR(1024),
content TEXT,
meta JSONB,
page_data JSONB,
create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT LIGHTRAG_DOC_FULL_PK PRIMARY KEY (workspace, id)
Expand All @@ -4523,6 +4537,9 @@ def namespace_to_table_name(namespace: str) -> str:
content TEXT,
file_path TEXT NULL,
llm_cache_list JSONB NULL DEFAULT '[]'::jsonb,
start_page INTEGER NULL,
end_page INTEGER NULL,
pages JSONB NULL,
create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT LIGHTRAG_DOC_CHUNKS_PK PRIMARY KEY (workspace, id)
Expand All @@ -4538,6 +4555,9 @@ def namespace_to_table_name(namespace: str) -> str:
content TEXT,
content_vector VECTOR({os.environ.get("EMBEDDING_DIM", 1024)}),
file_path TEXT NULL,
start_page INTEGER NULL,
end_page INTEGER NULL,
pages JSONB NULL,
create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT LIGHTRAG_VDB_CHUNKS_PK PRIMARY KEY (workspace, id)
Expand Down Expand Up @@ -4632,12 +4652,14 @@ def namespace_to_table_name(namespace: str) -> str:
SQL_TEMPLATES = {
# SQL for KVStorage
"get_by_id_full_docs": """SELECT id, COALESCE(content, '') as content,
COALESCE(doc_name, '') as file_path
COALESCE(doc_name, '') as file_path,
page_data
FROM LIGHTRAG_DOC_FULL WHERE workspace=$1 AND id=$2
""",
"get_by_id_text_chunks": """SELECT id, tokens, COALESCE(content, '') as content,
chunk_order_index, full_doc_id, file_path,
COALESCE(llm_cache_list, '[]'::jsonb) as llm_cache_list,
start_page, end_page, pages,
EXTRACT(EPOCH FROM create_time)::BIGINT as create_time,
EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id=$2
Expand Down Expand Up @@ -4684,11 +4706,12 @@ def namespace_to_table_name(namespace: str) -> str:
FROM LIGHTRAG_FULL_RELATIONS WHERE workspace=$1 AND id IN ({ids})
""",
"filter_keys": "SELECT id FROM {table_name} WHERE workspace=$1 AND id IN ({ids})",
"upsert_doc_full": """INSERT INTO LIGHTRAG_DOC_FULL (id, content, doc_name, workspace)
VALUES ($1, $2, $3, $4)
"upsert_doc_full": """INSERT INTO LIGHTRAG_DOC_FULL (id, content, doc_name, workspace, page_data)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (workspace,id) DO UPDATE
SET content = $2,
doc_name = $3,
page_data = $5,
update_time = CURRENT_TIMESTAMP
""",
"upsert_llm_response_cache": """INSERT INTO LIGHTRAG_LLM_CACHE(workspace,id,original_prompt,return_value,chunk_id,cache_type,queryparam)
Expand All @@ -4703,15 +4726,18 @@ def namespace_to_table_name(namespace: str) -> str:
""",
"upsert_text_chunk": """INSERT INTO LIGHTRAG_DOC_CHUNKS (workspace, id, tokens,
chunk_order_index, full_doc_id, content, file_path, llm_cache_list,
create_time, update_time)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
start_page, end_page, pages, create_time, update_time)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
ON CONFLICT (workspace,id) DO UPDATE
SET tokens=EXCLUDED.tokens,
chunk_order_index=EXCLUDED.chunk_order_index,
full_doc_id=EXCLUDED.full_doc_id,
content = EXCLUDED.content,
file_path=EXCLUDED.file_path,
llm_cache_list=EXCLUDED.llm_cache_list,
start_page=EXCLUDED.start_page,
end_page=EXCLUDED.end_page,
pages=EXCLUDED.pages,
update_time = EXCLUDED.update_time
""",
"upsert_full_entities": """INSERT INTO LIGHTRAG_FULL_ENTITIES (workspace, id, entity_names, count,
Expand All @@ -4733,15 +4759,18 @@ def namespace_to_table_name(namespace: str) -> str:
# SQL for VectorStorage
"upsert_chunk": """INSERT INTO LIGHTRAG_VDB_CHUNKS (workspace, id, tokens,
chunk_order_index, full_doc_id, content, content_vector, file_path,
create_time, update_time)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
start_page, end_page, pages, create_time, update_time)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
ON CONFLICT (workspace,id) DO UPDATE
SET tokens=EXCLUDED.tokens,
chunk_order_index=EXCLUDED.chunk_order_index,
full_doc_id=EXCLUDED.full_doc_id,
content = EXCLUDED.content,
content_vector=EXCLUDED.content_vector,
file_path=EXCLUDED.file_path,
start_page=EXCLUDED.start_page,
end_page=EXCLUDED.end_page,
pages=EXCLUDED.pages,
update_time = EXCLUDED.update_time
""",
"upsert_entity": """INSERT INTO LIGHTRAG_VDB_ENTITY (workspace, id, entity_name, content,
Expand Down Expand Up @@ -4790,6 +4819,9 @@ def namespace_to_table_name(namespace: str) -> str:
SELECT c.id,
c.content,
c.file_path,
c.start_page,
c.end_page,
c.pages,
EXTRACT(EPOCH FROM c.create_time)::BIGINT AS created_at
FROM LIGHTRAG_VDB_CHUNKS c
WHERE c.workspace = $1
Expand Down
Loading