88
99import duckdb
1010import numpy as np
11+ import pyarrow as pa # type: ignore[import-untyped]
1112from numpy .typing import NDArray
1213
1314from microrag .exceptions import StorageError
@@ -88,37 +89,51 @@ def _init_schema(self) -> None:
8889 conn .execute ("SET hnsw_enable_experimental_persistence = true" )
8990
9091 # Create documents table
91- conn .execute (f"""
92+ conn .execute (
93+ f"""
9294 CREATE TABLE IF NOT EXISTS documents (
9395 id VARCHAR PRIMARY KEY,
9496 content TEXT NOT NULL,
9597 metadata JSON,
9698 embedding FLOAT[{ self ._embedding_dim } ]
9799 )
98- """ )
100+ """
101+ )
99102
100103 def add_documents (self , documents : Sequence [Document ]) -> None :
101- """Add documents to storage."""
104+ """Add documents to storage using PyArrow bulk import for performance ."""
102105 if not documents :
103106 return
104107
105- logger .debug ("Storing %d document(s) in DuckDB" , len (documents ))
108+ logger .debug ("Storing %d document(s) in DuckDB via PyArrow " , len (documents ))
106109 conn = self .conn
107110 try :
111+ # Validate embeddings
108112 for doc in documents :
109113 if doc .embedding is None :
110114 raise StorageError (f"Document { doc .id } has no embedding" )
111115
112- embedding_list = doc .embedding .tolist ()
113- metadata_json = json .dumps (doc .metadata )
116+ # Create PyArrow table - DuckDB can query it directly by name
117+ arrow_table = pa .table ( # noqa: F841
118+ {
119+ "id" : [doc .id for doc in documents ],
120+ "content" : [doc .content for doc in documents ],
121+ "metadata" : [json .dumps (doc .metadata ) for doc in documents ],
122+ "embedding" : pa .array (
123+ [doc .embedding .tolist () for doc in documents ], # type: ignore[union-attr]
124+ type = pa .list_ (pa .float32 ()),
125+ ),
126+ }
127+ )
114128
115- conn .execute (
116- """
117- INSERT OR REPLACE INTO documents (id, content, metadata, embedding)
118- VALUES (?, ?, ?, ?)
119- """ ,
120- [doc .id , doc .content , metadata_json , embedding_list ],
121- )
129+ # Bulk import directly from PyArrow table (no temp file needed)
130+ conn .execute (
131+ f"""
132+ INSERT OR REPLACE INTO documents
133+ SELECT id, content, metadata::JSON, embedding::FLOAT[{ self ._embedding_dim } ]
134+ FROM arrow_table
135+ """
136+ )
122137
123138 # Invalidate indexes after adding documents
124139 self ._vector_index_built = False
@@ -223,7 +238,8 @@ def build_vector_index(
223238 conn .execute ("DROP INDEX IF EXISTS documents_embedding_idx" )
224239
225240 # Create HNSW index
226- conn .execute (f"""
241+ conn .execute (
242+ f"""
227243 CREATE INDEX documents_embedding_idx ON documents
228244 USING HNSW (embedding)
229245 WITH (
@@ -232,7 +248,8 @@ def build_vector_index(
232248 ef_search = { ef_search } ,
233249 m = { m }
234250 )
235- """ )
251+ """
252+ )
236253
237254 self ._vector_index_built = True
238255 logger .debug ("HNSW vector index built" )
@@ -246,7 +263,8 @@ def build_fts_index(self) -> None:
246263 conn = self .conn
247264
248265 # Create FTS index using PRAGMA
249- conn .execute ("""
266+ conn .execute (
267+ """
250268 PRAGMA create_fts_index(
251269 'documents',
252270 'id',
@@ -257,7 +275,8 @@ def build_fts_index(self) -> None:
257275 strip_accents = 1,
258276 lower = 1
259277 )
260- """ )
278+ """
279+ )
261280
262281 self ._fts_index_built = True
263282 logger .debug ("FTS index built" )
0 commit comments