From 394186513cd019e0a199da7cfd5a8c14e9b9c5c8 Mon Sep 17 00:00:00 2001 From: jerryao Date: Fri, 28 Mar 2025 22:27:48 +0800 Subject: [PATCH] Add local vector store implementation as Azure Search Documents alternative --- graphrag/vector_stores/factory.py | 28 +- graphrag/vector_stores/local_vector_store.py | 439 +++++++++++++++++++ local_vector_store_config.yml | 37 ++ pyproject.toml | 1 + test_local_store.py | 73 +++ test_local_store_advanced.py | 236 ++++++++++ 6 files changed, 813 insertions(+), 1 deletion(-) create mode 100644 graphrag/vector_stores/local_vector_store.py create mode 100644 local_vector_store_config.yml create mode 100644 test_local_store.py create mode 100644 test_local_store_advanced.py diff --git a/graphrag/vector_stores/factory.py b/graphrag/vector_stores/factory.py index 1c37316d0c..e3ed70d8be 100644 --- a/graphrag/vector_stores/factory.py +++ b/graphrag/vector_stores/factory.py @@ -4,12 +4,13 @@ """A package containing a factory and supported vector store types.""" from enum import Enum -from typing import ClassVar +from typing import Any, ClassVar from graphrag.vector_stores.azure_ai_search import AzureAISearchVectorStore from graphrag.vector_stores.base import BaseVectorStore from graphrag.vector_stores.cosmosdb import CosmosDBVectoreStore from graphrag.vector_stores.lancedb import LanceDBVectorStore +from graphrag.vector_stores.local_vector_store import LocalVectorStore class VectorStoreType(str, Enum): @@ -18,6 +19,7 @@ class VectorStoreType(str, Enum): LanceDB = "lancedb" AzureAISearch = "azure_ai_search" CosmosDB = "cosmosdb" + Local = "local" class VectorStoreFactory: @@ -45,8 +47,32 @@ def create_vector_store( return AzureAISearchVectorStore(**kwargs) case VectorStoreType.CosmosDB: return CosmosDBVectoreStore(**kwargs) + case VectorStoreType.Local: + return LocalVectorStore(**kwargs) case _: if vector_store_type in cls.vector_store_types: return cls.vector_store_types[vector_store_type](**kwargs) msg = f"Unknown vector store type: {vector_store_type}" raise ValueError(msg) + +def get_vector_store( + store_type: VectorStoreType, + collection_name: str, + **kwargs: Any, +) -> BaseVectorStore: + """Get a vector store instance based on the store type.""" + store_map: ClassVar[dict[VectorStoreType, type[BaseVectorStore]]] = { + VectorStoreType.LanceDB: LanceDBVectorStore, + VectorStoreType.AzureAISearch: AzureAISearchVectorStore, + VectorStoreType.CosmosDB: CosmosDBVectoreStore, + VectorStoreType.Local: LocalVectorStore, + } + + store_class = store_map.get(store_type) + if store_class is None: + msg = f"Unsupported vector store type: {store_type}" + raise ValueError(msg) + + store = store_class(collection_name=collection_name) + store.connect(**kwargs) + return store diff --git a/graphrag/vector_stores/local_vector_store.py b/graphrag/vector_stores/local_vector_store.py new file mode 100644 index 0000000000..868fde47ff --- /dev/null +++ b/graphrag/vector_stores/local_vector_store.py @@ -0,0 +1,439 @@ +"""Local vector storage implementation.""" + +import json +import os +import gzip +from pathlib import Path +from typing import Any, Dict, List, Optional, Generator, Tuple +import math + +import numpy as np +from scipy.spatial.distance import cosine + +from graphrag.data_model.types import TextEmbedder +from graphrag.vector_stores.base import ( + BaseVectorStore, + VectorStoreDocument, + VectorStoreSearchResult, +) + + +class LocalVectorStore(BaseVectorStore): + """Local vector storage implementation using file system.""" + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._data_dir: Path | None = None + self._documents: Dict[str, VectorStoreDocument] = {} + self._max_chunk_size: int = kwargs.get("max_chunk_size", 1000) + self._compression_enabled: bool = kwargs.get("compression_enabled", False) + self._vectors_loaded: bool = False + self._doc_metadata: Dict[str, Dict[str, Any]] = {} + + def connect(self, **kwargs: Any) -> Any: + """Connect to the local vector storage.""" + db_uri = kwargs.get("db_uri", "./data/vector_store") + self._data_dir = Path(db_uri) + self._data_dir.mkdir(parents=True, exist_ok=True) + self._max_chunk_size = kwargs.get("max_chunk_size", 1000) + self._compression_enabled = kwargs.get("compression_enabled", False) + self._load_existing_metadata() # Load only metadata first + + def _get_metadata_path(self) -> Path: + """Get path to metadata file.""" + if not self._data_dir: + raise ValueError("Data directory not configured") + return self._data_dir / f"{self.collection_name}_metadata.json" + + def _get_chunk_path(self, chunk_id: int) -> Path: + """Get path to a specific chunk file.""" + if not self._data_dir: + raise ValueError("Data directory not configured") + filename = f"{self.collection_name}_chunk_{chunk_id}.json" + if self._compression_enabled: + filename += ".gz" + return self._data_dir / filename + + def _load_existing_metadata(self) -> None: + """Load existing metadata from disk.""" + metadata_path = self._get_metadata_path() + if metadata_path.exists(): + with open(metadata_path, "r", encoding="utf-8") as f: + metadata = json.load(f) + self._doc_metadata = metadata.get("documents", {}) + + def _load_vectors_if_needed(self) -> None: + """Load vectors from disk if not already loaded.""" + if self._vectors_loaded: + return + + # First clear any existing documents (keeping metadata) + self._documents.clear() + + # Load chunks + chunk_id = 0 + while True: + chunk_path = self._get_chunk_path(chunk_id) + if not chunk_path.exists(): + break + + try: + if self._compression_enabled: + with gzip.open(chunk_path, "rt", encoding="utf-8") as f: + chunk_data = json.load(f) + else: + with open(chunk_path, "r", encoding="utf-8") as f: + chunk_data = json.load(f) + + for doc_id, doc_data in chunk_data.items(): + self._documents[doc_id] = VectorStoreDocument( + id=doc_id, + text=doc_data["text"], + vector=doc_data["vector"], + attributes=doc_data["attributes"], + ) + except Exception as e: + print(f"Error loading chunk {chunk_id}: {e}") + + chunk_id += 1 + + self._vectors_loaded = True + + def _save_metadata(self) -> None: + """Save metadata to disk.""" + if not self._data_dir: + return + + metadata_path = self._get_metadata_path() + metadata = { + "collection_name": self.collection_name, + "document_count": len(self._doc_metadata), + "chunk_count": math.ceil(len(self._doc_metadata) / self._max_chunk_size), + "documents": self._doc_metadata + } + + with open(metadata_path, "w", encoding="utf-8") as f: + json.dump(metadata, f, ensure_ascii=False, indent=2) + + def _save_data(self) -> None: + """Save data to disk in chunks.""" + if not self._data_dir: + return + + # Update metadata + self._doc_metadata = { + doc_id: { + "id": doc_id, + "text_length": len(doc.text) if doc.text else 0, + "attributes": doc.attributes + } + for doc_id, doc in self._documents.items() + } + + # Save metadata + self._save_metadata() + + # Chunk documents and save + chunks = self._chunk_documents(self._documents, self._max_chunk_size) + + # First clear any existing chunks + chunk_id = 0 + while True: + chunk_path = self._get_chunk_path(chunk_id) + if chunk_path.exists(): + chunk_path.unlink() + else: + break + chunk_id += 1 + + # Save new chunks + for chunk_id, chunk_docs in enumerate(chunks): + chunk_data = { + doc_id: { + "text": doc.text, + "vector": doc.vector, + "attributes": doc.attributes, + } + for doc_id, doc in chunk_docs.items() + } + + chunk_path = self._get_chunk_path(chunk_id) + + if self._compression_enabled: + with gzip.open(chunk_path, "wt", encoding="utf-8") as f: + json.dump(chunk_data, f, ensure_ascii=False) + else: + with open(chunk_path, "w", encoding="utf-8") as f: + json.dump(chunk_data, f, ensure_ascii=False, indent=2) + + def _chunk_documents( + self, documents: Dict[str, VectorStoreDocument], chunk_size: int + ) -> List[Dict[str, VectorStoreDocument]]: + """Split documents into chunks of specified size.""" + if not documents: + return [] + + doc_items = list(documents.items()) + chunk_count = math.ceil(len(doc_items) / chunk_size) + chunks = [] + + for i in range(chunk_count): + start_idx = i * chunk_size + end_idx = min((i + 1) * chunk_size, len(doc_items)) + chunk = dict(doc_items[start_idx:end_idx]) + chunks.append(chunk) + + return chunks + + def load_documents( + self, documents: List[VectorStoreDocument], overwrite: bool = True + ) -> None: + """Load documents into vector storage.""" + # Ensure vectors are loaded first + self._load_vectors_if_needed() + + if overwrite: + self._documents.clear() + + for doc in documents: + if doc.vector is not None: + self._documents[doc.id] = doc + + self._save_data() + + def load_documents_in_chunks( + self, documents: List[VectorStoreDocument], chunk_size: int = 100, overwrite: bool = True + ) -> None: + """Load documents in chunks to handle large document sets. + + Args: + documents: List of documents to load + chunk_size: Size of each processing chunk + overwrite: Whether to overwrite existing documents + """ + # Ensure vectors are loaded if needed + self._load_vectors_if_needed() + + if overwrite: + self._documents.clear() + + # Process in chunks + for i in range(0, len(documents), chunk_size): + chunk = documents[i:i+chunk_size] + + for doc in chunk: + if doc.vector is not None: + self._documents[doc.id] = doc + + # Save periodically to avoid memory issues + if (i + chunk_size) >= len(documents) or (i + chunk_size) % (chunk_size * 10) == 0: + self._save_data() + + def filter_by_id(self, include_ids: List[str] | List[int]) -> Any: + """Build a query filter to filter documents by id.""" + if not include_ids: + self.query_filter = None + else: + self.query_filter = include_ids + return self.query_filter + + def filter_by_attributes(self, attribute_filters: Dict[str, Any]) -> None: + """Filter documents by attributes. + + Args: + attribute_filters: Dictionary of attribute name and value to filter by + """ + if not attribute_filters: + self.query_filter = None + return + + # Load metadata if needed + if not self._doc_metadata: + self._load_existing_metadata() + + # Find matching document IDs + matching_ids = [] + for doc_id, metadata in self._doc_metadata.items(): + attributes = metadata.get("attributes", {}) + match = True + + for attr_name, attr_value in attribute_filters.items(): + if attr_name not in attributes or attributes[attr_name] != attr_value: + match = False + break + + if match: + matching_ids.append(doc_id) + + self.query_filter = matching_ids + + def _compute_similarity(self, vec1: List[float], vec2: List[float]) -> float: + """Compute cosine similarity between two vectors.""" + return 1 - cosine(vec1, vec2) + + def similarity_search_by_vector( + self, query_embedding: List[float], k: int = 10, **kwargs: Any + ) -> List[VectorStoreSearchResult]: + """Perform a vector-based similarity search.""" + # Ensure vectors are loaded + self._load_vectors_if_needed() + + results = [] + for doc_id, doc in self._documents.items(): + if self.query_filter and doc_id not in self.query_filter: + continue + + if doc.vector is not None: + similarity = self._compute_similarity(query_embedding, doc.vector) + results.append( + VectorStoreSearchResult( + document=doc, + score=similarity, + ) + ) + + # Sort by similarity score and take top k + results.sort(key=lambda x: x.score, reverse=True) + return results[:k] + + def similarity_search_by_text( + self, text: str, text_embedder: TextEmbedder, k: int = 10, **kwargs: Any + ) -> List[VectorStoreSearchResult]: + """Perform a similarity search using a given input text.""" + query_embedding = text_embedder(text) + if query_embedding: + return self.similarity_search_by_vector(query_embedding, k) + return [] + + def search_by_id(self, id: str) -> VectorStoreDocument: + """Search for a document by id.""" + # First check if document exists in metadata + if id not in self._doc_metadata and not self._vectors_loaded: + # If not in metadata and vectors aren't loaded, we know it doesn't exist + return VectorStoreDocument(id=id, text=None, vector=None) + + # Load vectors if needed + self._load_vectors_if_needed() + return self._documents.get(id, VectorStoreDocument(id=id, text=None, vector=None)) + + def export_data(self, export_path: str = None) -> str: + """Export all data to a JSON file. + + Args: + export_path: Optional path for export file. If not provided, + uses collection name in data directory. + + Returns: + Path to the exported file. + """ + # Ensure vectors are loaded + self._load_vectors_if_needed() + + if not export_path: + if not self._data_dir: + raise ValueError("No data directory configured") + export_path = self._data_dir / f"{self.collection_name}_export.json" + else: + export_path = Path(export_path) + + export_data = { + "collection_name": self.collection_name, + "documents": [ + { + "id": doc.id, + "text": doc.text, + "vector": doc.vector, + "attributes": doc.attributes + } + for doc in self._documents.values() + ] + } + + with open(export_path, "w", encoding="utf-8") as f: + json.dump(export_data, f, ensure_ascii=False, indent=2) + + return str(export_path) + + def import_data(self, import_path: str, merge: bool = False) -> int: + """Import data from a JSON file. + + Args: + import_path: Path to the import file + merge: If True, merge with existing data; if False, replace + + Returns: + Number of documents imported + """ + # Ensure vectors are loaded if doing a merge + if merge: + self._load_vectors_if_needed() + + import_path = Path(import_path) + if not import_path.exists(): + raise FileNotFoundError(f"Import file not found: {import_path}") + + with open(import_path, "r", encoding="utf-8") as f: + import_data = json.load(f) + + if not merge: + self._documents.clear() + + documents = [] + for doc_data in import_data.get("documents", []): + doc = VectorStoreDocument( + id=doc_data["id"], + text=doc_data["text"], + vector=doc_data["vector"], + attributes=doc_data["attributes"] + ) + documents.append(doc) + + # Use chunked loading for large imports + self.load_documents_in_chunks(documents, chunk_size=self._max_chunk_size, overwrite=False) + return len(documents) + + def get_document_count(self) -> int: + """Get the total number of documents in the store.""" + # Use metadata if available + if self._doc_metadata: + return len(self._doc_metadata) + else: + # Fallback to loaded documents + self._load_vectors_if_needed() + return len(self._documents) + + def clear(self) -> None: + """Clear all documents from the store.""" + self._documents.clear() + self._doc_metadata.clear() + self._save_data() + + def get_stats(self) -> Dict[str, Any]: + """Get statistics about the vector store.""" + # Load metadata to ensure it's up to date + self._load_existing_metadata() + + # Calculate disk usage + disk_usage = 0 + metadata_path = self._get_metadata_path() + if metadata_path.exists(): + disk_usage += metadata_path.stat().st_size + + chunk_id = 0 + while True: + chunk_path = self._get_chunk_path(chunk_id) + if chunk_path.exists(): + disk_usage += chunk_path.stat().st_size + chunk_id += 1 + else: + break + + return { + "collection_name": self.collection_name, + "document_count": len(self._doc_metadata), + "disk_usage_bytes": disk_usage, + "disk_usage_mb": round(disk_usage / (1024 * 1024), 2), + "compression_enabled": self._compression_enabled, + "chunk_count": chunk_id, + "vectors_loaded": self._vectors_loaded + } \ No newline at end of file diff --git a/local_vector_store_config.yml b/local_vector_store_config.yml new file mode 100644 index 0000000000..e5c2dbeeee --- /dev/null +++ b/local_vector_store_config.yml @@ -0,0 +1,37 @@ +vector_store: + default_vector_store: + type: "local" + db_uri: "./data/vector_store" + container_name: "graph_data" + max_chunk_size: 1000 + compression_enabled: true + +models: + default_chat_model: + type: "openai_chat" + model: "gpt-4-turbo-preview" + auth_type: "api_key" + api_key: "${OPENAI_API_KEY}" + + default_embedding_model: + type: "openai_embedding" + model: "text-embedding-3-small" + auth_type: "api_key" + api_key: "${OPENAI_API_KEY}" + +input: + type: local + file_type: text + base_dir: input + +cache: + type: local + base_dir: cache + +storage: + type: local + base_dir: output + +reporting: + type: local + base_dir: reports \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index c10e35b2a5..89e943ac1b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -69,6 +69,7 @@ networkx = "^3.4.2" pandas = "^2.2.3" pyarrow = "^15.0.0" umap-learn = "^0.5.6" +scipy = "^1.12.0" # Configuration pyyaml = "^6.0.2" diff --git a/test_local_store.py b/test_local_store.py new file mode 100644 index 0000000000..7aa4eaea22 --- /dev/null +++ b/test_local_store.py @@ -0,0 +1,73 @@ +"""Test script for local vector store.""" + +import asyncio +from pathlib import Path + +from graphrag.config.models.graph_rag_config import GraphRagConfig +from graphrag.data_model.types import TextEmbedder +from graphrag.vector_stores.base import VectorStoreDocument +from graphrag.vector_stores.factory import VectorStoreType, get_vector_store + + +class MockEmbedder(TextEmbedder): + """Mock text embedder for testing.""" + + def __call__(self, text: str) -> list[float]: + """Generate a mock embedding.""" + # Simple mock embedding: convert text to ASCII values and normalize + return [ord(c) / 255.0 for c in text[:10]] # Use first 10 chars + + +async def main(): + """Test the local vector store.""" + # Create necessary directories + Path("data/vector_store").mkdir(parents=True, exist_ok=True) + Path("input").mkdir(exist_ok=True) + Path("cache").mkdir(exist_ok=True) + Path("output").mkdir(exist_ok=True) + Path("reports").mkdir(exist_ok=True) + + # Initialize vector store + store = get_vector_store( + store_type=VectorStoreType.Local, + collection_name="test_collection", + db_uri="./data/vector_store", + ) + + # Create test documents + test_docs = [ + VectorStoreDocument( + id="doc1", + text="This is a test document about machine learning", + vector=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0], + attributes={"category": "ml"}, + ), + VectorStoreDocument( + id="doc2", + text="This is another document about artificial intelligence", + vector=[0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 0.1], + attributes={"category": "ai"}, + ), + ] + + # Load documents + store.load_documents(test_docs) + + # Test similarity search + embedder = MockEmbedder() + query = "machine learning" + results = store.similarity_search_by_text(query, embedder, k=2) + + print("\nSearch Results:") + for result in results: + print(f"\nDocument: {result.document.text}") + print(f"Score: {result.score}") + print(f"Attributes: {result.document.attributes}") + + # Test search by ID + doc = store.search_by_id("doc1") + print(f"\nDocument by ID: {doc.text}") + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/test_local_store_advanced.py b/test_local_store_advanced.py new file mode 100644 index 0000000000..7b8e3b1b97 --- /dev/null +++ b/test_local_store_advanced.py @@ -0,0 +1,236 @@ +"""Advanced test script for local vector store.""" + +import asyncio +import os +import random +import shutil +import time +from pathlib import Path + +from graphrag.data_model.types import TextEmbedder +from graphrag.vector_stores.base import VectorStoreDocument +from graphrag.vector_stores.factory import VectorStoreType, get_vector_store + + +class MockEmbedder(TextEmbedder): + """Mock text embedder for testing.""" + + def __call__(self, text: str) -> list[float]: + """Generate a mock embedding.""" + # Simple mock embedding based on hash of the text + import hashlib + + hash_obj = hashlib.md5(text.encode()) + hash_bytes = hash_obj.digest() + + # Convert hash bytes to normalized floats + vec = [float(b) / 255.0 for b in hash_bytes] + # Pad or truncate to 10 dimensions + return vec[:10] if len(vec) >= 10 else vec + [0.0] * (10 - len(vec)) + + +def generate_test_docs(count: int) -> list[VectorStoreDocument]: + """Generate test documents with random content.""" + categories = ["ml", "ai", "nlp", "cv", "rl"] + topics = ["machine learning", "neural networks", "deep learning", + "computer vision", "natural language processing", + "reinforcement learning", "transformers", "attention"] + + docs = [] + for i in range(count): + category = random.choice(categories) + topic1 = random.choice(topics) + topic2 = random.choice(topics) + + text = f"Document {i+1}: This is a test document about {topic1} and {topic2}." + + # Create a deterministic but varied vector + vec = [ + (i % 10) / 10.0, + ((i + 1) % 10) / 10.0, + ((i + 2) % 10) / 10.0, + ((i + 3) % 10) / 10.0, + ((i + 4) % 10) / 10.0, + ((i + 5) % 10) / 10.0, + ((i + 6) % 10) / 10.0, + ((i + 7) % 10) / 10.0, + ((i + 8) % 10) / 10.0, + ((i + 9) % 10) / 10.0, + ] + + doc = VectorStoreDocument( + id=f"doc_{i+1}", + text=text, + vector=vec, + attributes={"category": category, "length": len(text)} + ) + docs.append(doc) + + return docs + + +async def test_basic_functionality(): + """Test basic functionality.""" + print("\n--- Testing Basic Functionality ---") + + # Create store + store = get_vector_store( + store_type=VectorStoreType.Local, + collection_name="test_basic", + db_uri="./data/vector_store" + ) + + # Generate and load documents + docs = generate_test_docs(5) + store.load_documents(docs) + + # Test search by ID + doc = store.search_by_id("doc_1") + print(f"Document by ID: {doc.text}") + + # Test similarity search + embedder = MockEmbedder() + query = "machine learning" + results = store.similarity_search_by_text(query, embedder, k=2) + + print("\nSearch Results:") + for result in results: + print(f"\nDocument: {result.document.text}") + print(f"Score: {result.score}") + print(f"Attributes: {result.document.attributes}") + + +async def test_large_dataset(): + """Test with a larger dataset.""" + print("\n--- Testing Large Dataset ---") + + # Create store with chunking and compression + store = get_vector_store( + store_type=VectorStoreType.Local, + collection_name="test_large", + db_uri="./data/vector_store", + max_chunk_size=50, + compression_enabled=True + ) + + # Generate and load documents in chunks + doc_count = 200 + print(f"Generating {doc_count} documents...") + docs = generate_test_docs(doc_count) + + print("Loading documents...") + start_time = time.time() + store.load_documents_in_chunks(docs, chunk_size=50) + load_time = time.time() - start_time + print(f"Documents loaded in {load_time:.2f} seconds") + + # Test search + embedder = MockEmbedder() + query = "neural networks" + + print("Searching...") + start_time = time.time() + results = store.similarity_search_by_text(query, embedder, k=5) + search_time = time.time() - start_time + print(f"Search completed in {search_time:.2f} seconds") + + print(f"\nTop result: {results[0].document.text}") + print(f"Score: {results[0].score}") + + # Get stats + stats = store.get_stats() + print("\nStore Statistics:") + for key, value in stats.items(): + print(f"{key}: {value}") + + +async def test_filter_by_attributes(): + """Test filtering by attributes.""" + print("\n--- Testing Filter by Attributes ---") + + # Create store + store = get_vector_store( + store_type=VectorStoreType.Local, + collection_name="test_filters", + db_uri="./data/vector_store" + ) + + # Generate and load documents + docs = generate_test_docs(100) + store.load_documents(docs) + + # Filter by category + embedder = MockEmbedder() + query = "machine learning" + + print("\nWithout filter:") + results = store.similarity_search_by_text(query, embedder, k=3) + for result in results: + print(f"Document: {result.document.text}") + print(f"Category: {result.document.attributes['category']}") + + print("\nWith 'ml' category filter:") + store.filter_by_attributes({"category": "ml"}) + results = store.similarity_search_by_text(query, embedder, k=3) + for result in results: + print(f"Document: {result.document.text}") + print(f"Category: {result.document.attributes['category']}") + + +async def test_export_import(): + """Test export and import functionality.""" + print("\n--- Testing Export and Import ---") + + # Create source store + source_store = get_vector_store( + store_type=VectorStoreType.Local, + collection_name="test_export", + db_uri="./data/vector_store" + ) + + # Generate and load documents + docs = generate_test_docs(20) + source_store.load_documents(docs) + + # Export data + export_path = source_store.export_data("./data/vector_store/export_test.json") + print(f"Data exported to {export_path}") + + # Create target store + target_store = get_vector_store( + store_type=VectorStoreType.Local, + collection_name="test_import", + db_uri="./data/vector_store" + ) + + # Import data + doc_count = target_store.import_data(export_path) + print(f"Imported {doc_count} documents") + + # Verify imported data + print(f"Target store document count: {target_store.get_document_count()}") + doc = target_store.search_by_id("doc_1") + print(f"Sample document: {doc.text}") + + +async def main(): + """Run all tests.""" + # Create necessary directories + test_dir = Path("data/vector_store") + + # Clear previous test data + if test_dir.exists(): + shutil.rmtree(test_dir) + test_dir.mkdir(parents=True, exist_ok=True) + + # Run tests + await test_basic_functionality() + await test_large_dataset() + await test_filter_by_attributes() + await test_export_import() + + print("\nAll tests completed!") + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file