From 1b0c89d2f9c9e16d4866176c031f769178f8ebdd Mon Sep 17 00:00:00 2001 From: Cedric Krusche Date: Wed, 2 Jul 2025 15:36:20 +0200 Subject: [PATCH 01/10] Implement get through filter --- langchain_postgres/v2/async_vectorstore.py | 95 ++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/langchain_postgres/v2/async_vectorstore.py b/langchain_postgres/v2/async_vectorstore.py index fd0bfd7..7051865 100644 --- a/langchain_postgres/v2/async_vectorstore.py +++ b/langchain_postgres/v2/async_vectorstore.py @@ -674,6 +674,69 @@ async def __query_collection( return combined_results return dense_results + async def __query_collection_with_filter( + self, + *, + k: Optional[int] = None, + filter: Optional[dict] = None, + **kwargs: Any, + ) -> Sequence[RowMapping]: + """ + Perform filter query on database. + Queries might be slow if the hybrid search column does not exist. + For best hybrid search performance, consider creating a TSV column + and adding GIN index. + """ + if not k: + k = ( + max( + self.k, + self.hybrid_search_config.primary_top_k, + self.hybrid_search_config.secondary_top_k, + ) + if self.hybrid_search_config + else self.k + ) + + columns = [ + self.id_column, + self.content_column, + self.embedding_column, + ] + self.metadata_columns + if self.metadata_json_column: + columns.append(self.metadata_json_column) + + column_names = ", ".join(f'"{col}"' for col in columns) + + safe_filter = None + filter_dict = None + if filter and isinstance(filter, dict): + safe_filter, filter_dict = self._create_filter_clause(filter) + + where_filters = f"WHERE {safe_filter}" if safe_filter else "" + dense_query_stmt = f"""SELECT {column_names} + FROM "{self.schema_name}"."{self.table_name}" {where_filters} LIMIT :k; + """ + param_dict = {"k": k} + if filter_dict: + param_dict.update(filter_dict) + if self.index_query_options: + async with self.engine.connect() as conn: + # Set each query option individually + for query_option in self.index_query_options.to_parameter(): + query_options_stmt = f"SET LOCAL {query_option};" + await conn.execute(text(query_options_stmt)) + result = await conn.execute(text(dense_query_stmt), param_dict) + result_map = result.mappings() + results = result_map.fetchall() + else: + async with self.engine.connect() as conn: + result = await conn.execute(text(dense_query_stmt), param_dict) + result_map = result.mappings() + results = result_map.fetchall() + + return results + async def asimilarity_search( self, query: str, @@ -997,6 +1060,38 @@ async def is_valid_index( results = result_map.fetchall() return bool(len(results) == 1) + async def aget( + self, + filter: Optional[dict] = None, + k: Optional[int] = None, + **kwargs: Any, + ): + + results = await self.__query_collection_with_filter( + k=k, filter=filter, **kwargs + ) + + documents = [] + for row in results: + metadata = ( + row[self.metadata_json_column] + if self.metadata_json_column and row[self.metadata_json_column] + else {} + ) + for col in self.metadata_columns: + metadata[col] = row[col] + documents.append( + ( + Document( + page_content=row[self.content_column], + metadata=metadata, + id=str(row[self.id_column]), + ), + ) + ) + + return documents + async def aget_by_ids(self, ids: Sequence[str]) -> list[Document]: """Get documents by ids.""" From 99d1440e6989b257361314c5e2e5a3db19d74d0c Mon Sep 17 00:00:00 2001 From: Cedric Krusche Date: Wed, 2 Jul 2025 17:08:20 +0200 Subject: [PATCH 02/10] Implement synchronous get with pydoc and test cases --- langchain_postgres/v2/async_vectorstore.py | 49 ++++++++++++++----- langchain_postgres/v2/vectorstores.py | 23 +++++++++ .../v2/test_async_pg_vectorstore_search.py | 13 +++++ .../v2/test_pg_vectorstore_search.py | 14 ++++++ 4 files changed, 87 insertions(+), 12 deletions(-) diff --git a/langchain_postgres/v2/async_vectorstore.py b/langchain_postgres/v2/async_vectorstore.py index 7051865..7cbeefd 100644 --- a/langchain_postgres/v2/async_vectorstore.py +++ b/langchain_postgres/v2/async_vectorstore.py @@ -682,11 +682,22 @@ async def __query_collection_with_filter( **kwargs: Any, ) -> Sequence[RowMapping]: """ - Perform filter query on database. - Queries might be slow if the hybrid search column does not exist. - For best hybrid search performance, consider creating a TSV column - and adding GIN index. + Asynchronously query the database collection using optional filters and return matching rows. + + Args: + k (Optional[int]): The maximum number of rows to retrieve. If not provided, a default is + computed based on the hybrid search configuration or a fallback value. + filter (Optional[dict]): A dictionary representing filtering conditions to apply in the SQL WHERE clause. + **kwargs (Any): Additional keyword arguments (currently unused but accepted for extensibility). + + Returns: + Sequence[RowMapping]: A sequence of row mappings, representing a Document + + Notes: + - If `k` is not specified, it defaults to the maximum of the configured top-k values. + - If `index_query_options` are set, they are applied using `SET LOCAL` before executing the query. """ + if not k: k = ( max( @@ -1065,7 +1076,23 @@ async def aget( filter: Optional[dict] = None, k: Optional[int] = None, **kwargs: Any, - ): + ) -> list[Document]: + """ + Asynchronously retrieves documents from a collection based on an optional filter and other parameters. + + This method queries the underlying collection using the provided filter and additional keyword arguments. + It constructs a list of `Document` objects from the query results, combining content and metadata from + specified columns. + + Args: + filter (Optional[dict]): A dictionary specifying filtering criteria for the query. Defaults to None. + k (Optional[int]): The maximum number of documents to retrieve. If None, retrieves all matching documents. + **kwargs (Any): Additional keyword arguments passed to the internal query method. + + Returns: + list[Document]: A list of `Document` instances, each containing content, metadata, and an identifier. + + """ results = await self.__query_collection_with_filter( k=k, filter=filter, **kwargs @@ -1081,13 +1108,11 @@ async def aget( for col in self.metadata_columns: metadata[col] = row[col] documents.append( - ( - Document( - page_content=row[self.content_column], - metadata=metadata, - id=str(row[self.id_column]), - ), - ) + Document( + page_content=row[self.content_column], + metadata=metadata, + id=str(row[self.id_column]), + ), ) return documents diff --git a/langchain_postgres/v2/vectorstores.py b/langchain_postgres/v2/vectorstores.py index 52224db..d140d26 100644 --- a/langchain_postgres/v2/vectorstores.py +++ b/langchain_postgres/v2/vectorstores.py @@ -853,6 +853,29 @@ async def aget_by_ids(self, ids: Sequence[str]) -> list[Document]: """Get documents by ids.""" return await self._engine._run_as_async(self.__vs.aget_by_ids(ids=ids)) + def get( + self, + filter: Optional[dict] = None, + k: Optional[int] = None, + **kwargs: Any, + ) -> list[Document]: + """ + Retrieve documents from the collection using optional filters and parameters. + + Args: + filter (Optional[dict]): A dictionary specifying filtering criteria for the query. Defaults to None. + k (Optional[int]): The maximum number of documents to retrieve. If None, retrieves all matching documents. + **kwargs (Any): Additional keyword arguments passed to the asynchronous `aget` method. + + Returns: + list[Document]: A list of `Document` instances matching the filter criteria. + + Raises: + Any exceptions raised by the underlying asynchronous method or the sync execution engine. + """ + + return self._engine._run_as_sync(self.__vs.aget(filter=filter, k=k, **kwargs)) + def get_by_ids(self, ids: Sequence[str]) -> list[Document]: """Get documents by ids.""" return self._engine._run_as_sync(self.__vs.aget_by_ids(ids=ids)) diff --git a/tests/unit_tests/v2/test_async_pg_vectorstore_search.py b/tests/unit_tests/v2/test_async_pg_vectorstore_search.py index 16c70fd..cdc4cfb 100644 --- a/tests/unit_tests/v2/test_async_pg_vectorstore_search.py +++ b/tests/unit_tests/v2/test_async_pg_vectorstore_search.py @@ -370,6 +370,19 @@ async def test_vectorstore_with_metadata_filters( ) assert [doc.metadata["code"] for doc in docs] == expected_ids, test_filter + @pytest.mark.parametrize("test_filter, expected_ids", FILTERING_TEST_CASES) + async def test_vectorstore_get( + self, + vs_custom_filter: AsyncPGVectorStore, + test_filter: dict, + expected_ids: list[str], + ) -> None: + """Test end to end construction and filter.""" + docs = await vs_custom_filter.aget(test_filter, k=5) + assert set([doc.metadata["code"] for doc in docs]) == set( + expected_ids + ), test_filter + async def test_asimilarity_hybrid_search(self, vs: AsyncPGVectorStore) -> None: results = await vs.asimilarity_search( "foo", k=1, hybrid_search_config=HybridSearchConfig() diff --git a/tests/unit_tests/v2/test_pg_vectorstore_search.py b/tests/unit_tests/v2/test_pg_vectorstore_search.py index 7815a25..9697883 100644 --- a/tests/unit_tests/v2/test_pg_vectorstore_search.py +++ b/tests/unit_tests/v2/test_pg_vectorstore_search.py @@ -429,6 +429,20 @@ def test_sync_vectorstore_with_metadata_filters( docs = vs_custom_filter_sync.similarity_search("meow", k=5, filter=test_filter) assert [doc.metadata["code"] for doc in docs] == expected_ids, test_filter + @pytest.mark.parametrize("test_filter, expected_ids", FILTERING_TEST_CASES) + def test_sync_vectorstore_get( + self, + vs_custom_filter_sync: PGVectorStore, + test_filter: dict, + expected_ids: list[str], + ) -> None: + """Test end to end construction and filter.""" + + docs = vs_custom_filter_sync.get(k=5, filter=test_filter) + assert set([doc.metadata["code"] for doc in docs]) == set( + expected_ids + ), test_filter + @pytest.mark.parametrize("test_filter", NEGATIVE_TEST_CASES) def test_metadata_filter_negative_tests( self, vs_custom_filter_sync: PGVectorStore, test_filter: dict From a9864930f3e873951c1157b47aadd37851b9ff80 Mon Sep 17 00:00:00 2001 From: Cedric Krusche Date: Fri, 11 Jul 2025 16:04:47 +0200 Subject: [PATCH 03/10] Implement missing async implementation into sync vector store --- langchain_postgres/v2/vectorstores.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/langchain_postgres/v2/vectorstores.py b/langchain_postgres/v2/vectorstores.py index d140d26..69f9fd8 100644 --- a/langchain_postgres/v2/vectorstores.py +++ b/langchain_postgres/v2/vectorstores.py @@ -853,6 +853,21 @@ async def aget_by_ids(self, ids: Sequence[str]) -> list[Document]: """Get documents by ids.""" return await self._engine._run_as_async(self.__vs.aget_by_ids(ids=ids)) + def get_by_ids(self, ids: Sequence[str]) -> list[Document]: + """Get documents by ids.""" + return self._engine._run_as_sync(self.__vs.aget_by_ids(ids=ids)) + + async def aget( + self, + filter: Optional[dict] = None, + k: Optional[int] = None, + **kwargs: Any, + ) -> list[Document]: + + return await self._engine._run_as_async( + self.__vs.aget(filter=filter, k=k, **kwargs) + ) + def get( self, filter: Optional[dict] = None, @@ -876,9 +891,5 @@ def get( return self._engine._run_as_sync(self.__vs.aget(filter=filter, k=k, **kwargs)) - def get_by_ids(self, ids: Sequence[str]) -> list[Document]: - """Get documents by ids.""" - return self._engine._run_as_sync(self.__vs.aget_by_ids(ids=ids)) - def get_table_name(self) -> str: return self.__vs.table_name From 6fafde4c56357d683d8588df32769253a009ddce Mon Sep 17 00:00:00 2001 From: gRedHeadphone Date: Mon, 13 Oct 2025 06:59:31 +0000 Subject: [PATCH 04/10] fix(docs): update command to install packages using uv & update port to default port used in tests --- DEVELOPMENT.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index 37095a3..b26725d 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -13,7 +13,7 @@ source .venv/bin/activate Install package in editable mode. ```shell -poetry install --with dev,test,lint +uv sync --group test ``` Start PostgreSQL/PGVector. @@ -22,7 +22,7 @@ docker run --rm -it --name pgvector-container \ -e POSTGRES_USER=langchain \ -e POSTGRES_PASSWORD=langchain \ -e POSTGRES_DB=langchain_test \ - -p 6024:5432 pgvector/pgvector:pg16 \ + -p 5432:5432 pgvector/pgvector:pg16 \ postgres -c log_statement=all ``` From 3dad796ab37efcbb434db018316674462d432c43 Mon Sep 17 00:00:00 2001 From: gRedHeadphone Date: Thu, 16 Oct 2025 07:13:52 +0000 Subject: [PATCH 05/10] fix(docs): update tests invoking command --- DEVELOPMENT.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index b26725d..7cedf45 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -22,11 +22,12 @@ docker run --rm -it --name pgvector-container \ -e POSTGRES_USER=langchain \ -e POSTGRES_PASSWORD=langchain \ -e POSTGRES_DB=langchain_test \ - -p 5432:5432 pgvector/pgvector:pg16 \ + -p 6024:5432 pgvector/pgvector:pg16 \ postgres -c log_statement=all ``` Invoke test cases. ```shell +export POSTGRES_PORT=6024 pytest -vvv ``` From a7dd0bddb594a9195dbb570dd142e438782ab842 Mon Sep 17 00:00:00 2001 From: gRedHeadphone Date: Thu, 23 Oct 2025 10:13:24 +0000 Subject: [PATCH 06/10] feat: add limit offset parameters to get methods similar to chroma vector store --- langchain_postgres/v2/async_vectorstore.py | 69 +++++++------------ langchain_postgres/v2/vectorstores.py | 28 +++----- .../v2/test_async_pg_vectorstore_search.py | 13 +++- .../v2/test_pg_vectorstore_search.py | 13 +++- 4 files changed, 55 insertions(+), 68 deletions(-) diff --git a/langchain_postgres/v2/async_vectorstore.py b/langchain_postgres/v2/async_vectorstore.py index e531d3b..a860cfe 100644 --- a/langchain_postgres/v2/async_vectorstore.py +++ b/langchain_postgres/v2/async_vectorstore.py @@ -675,37 +675,16 @@ async def __query_collection( async def __query_collection_with_filter( self, *, - k: Optional[int] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, filter: Optional[dict] = None, **kwargs: Any, ) -> Sequence[RowMapping]: """ - Asynchronously query the database collection using optional filters and return matching rows. - - Args: - k (Optional[int]): The maximum number of rows to retrieve. If not provided, a default is - computed based on the hybrid search configuration or a fallback value. - filter (Optional[dict]): A dictionary representing filtering conditions to apply in the SQL WHERE clause. - **kwargs (Any): Additional keyword arguments (currently unused but accepted for extensibility). - - Returns: - Sequence[RowMapping]: A sequence of row mappings, representing a Document - - Notes: - - If `k` is not specified, it defaults to the maximum of the configured top-k values. - - If `index_query_options` are set, they are applied using `SET LOCAL` before executing the query. - """ + Asynchronously query the database collection using filters and parameters and return matching rows.""" - if not k: - k = ( - max( - self.k, - self.hybrid_search_config.primary_top_k, - self.hybrid_search_config.secondary_top_k, - ) - if self.hybrid_search_config - else self.k - ) + limit = limit if limit is not None else self.k + offset = offset if offset is not None else 0 columns = [ self.id_column, @@ -722,11 +701,12 @@ async def __query_collection_with_filter( if filter and isinstance(filter, dict): safe_filter, filter_dict = self._create_filter_clause(filter) + suffix_id = str(uuid.uuid4()).split("-")[0] where_filters = f"WHERE {safe_filter}" if safe_filter else "" dense_query_stmt = f"""SELECT {column_names} - FROM "{self.schema_name}"."{self.table_name}" {where_filters} LIMIT :k; + FROM "{self.schema_name}"."{self.table_name}" {where_filters} LIMIT :limit_{suffix_id} OFFSET :offset_{suffix_id}; """ - param_dict = {"k": k} + param_dict = {f"limit_{suffix_id}": limit, f"offset_{suffix_id}": offset} if filter_dict: param_dict.update(filter_dict) if self.index_query_options: @@ -1072,28 +1052,14 @@ async def is_valid_index( async def aget( self, filter: Optional[dict] = None, - k: Optional[int] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, **kwargs: Any, ) -> list[Document]: - """ - Asynchronously retrieves documents from a collection based on an optional filter and other parameters. - - This method queries the underlying collection using the provided filter and additional keyword arguments. - It constructs a list of `Document` objects from the query results, combining content and metadata from - specified columns. - - Args: - filter (Optional[dict]): A dictionary specifying filtering criteria for the query. Defaults to None. - k (Optional[int]): The maximum number of documents to retrieve. If None, retrieves all matching documents. - **kwargs (Any): Additional keyword arguments passed to the internal query method. - - Returns: - list[Document]: A list of `Document` instances, each containing content, metadata, and an identifier. - - """ + """Retrieve documents from the collection using filters and parameters.""" results = await self.__query_collection_with_filter( - k=k, filter=filter, **kwargs + limit=limit, offset=offset, filter=filter, **kwargs ) documents = [] @@ -1369,6 +1335,17 @@ def _create_filter_clause(self, filters: Any) -> tuple[str, dict]: else: return "", {} + def get( + self, + filter: Optional[dict] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, + **kwargs: Any, + ) -> list[Document]: + raise NotImplementedError( + "Sync methods are not implemented for AsyncPGVectorStore. Use PGVectorStore interface instead." + ) + def get_by_ids(self, ids: Sequence[str]) -> list[Document]: raise NotImplementedError( "Sync methods are not implemented for AsyncPGVectorStore. Use PGVectorStore interface instead." diff --git a/langchain_postgres/v2/vectorstores.py b/langchain_postgres/v2/vectorstores.py index eaf1561..d08260a 100644 --- a/langchain_postgres/v2/vectorstores.py +++ b/langchain_postgres/v2/vectorstores.py @@ -878,36 +878,24 @@ def get_by_ids(self, ids: Sequence[str]) -> list[Document]: async def aget( self, filter: Optional[dict] = None, - k: Optional[int] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, **kwargs: Any, ) -> list[Document]: - + """Retrieve documents from the collection using filters and parameters.""" return await self._engine._run_as_async( - self.__vs.aget(filter=filter, k=k, **kwargs) + self.__vs.aget(filter=filter, limit=limit, offset=offset, **kwargs) ) def get( self, filter: Optional[dict] = None, - k: Optional[int] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, **kwargs: Any, ) -> list[Document]: - """ - Retrieve documents from the collection using optional filters and parameters. - - Args: - filter (Optional[dict]): A dictionary specifying filtering criteria for the query. Defaults to None. - k (Optional[int]): The maximum number of documents to retrieve. If None, retrieves all matching documents. - **kwargs (Any): Additional keyword arguments passed to the asynchronous `aget` method. - - Returns: - list[Document]: A list of `Document` instances matching the filter criteria. - - Raises: - Any exceptions raised by the underlying asynchronous method or the sync execution engine. - """ - - return self._engine._run_as_sync(self.__vs.aget(filter=filter, k=k, **kwargs)) + """Retrieve documents from the collection using filters and parameters.""" + return self._engine._run_as_sync(self.__vs.aget(filter=filter, limit=limit, offset=offset, **kwargs)) def get_table_name(self) -> str: return self.__vs.table_name diff --git a/tests/unit_tests/v2/test_async_pg_vectorstore_search.py b/tests/unit_tests/v2/test_async_pg_vectorstore_search.py index fc4860c..5a78dee 100644 --- a/tests/unit_tests/v2/test_async_pg_vectorstore_search.py +++ b/tests/unit_tests/v2/test_async_pg_vectorstore_search.py @@ -378,11 +378,22 @@ async def test_vectorstore_get( expected_ids: list[str], ) -> None: """Test end to end construction and filter.""" - docs = await vs_custom_filter.aget(test_filter, k=5) + docs = await vs_custom_filter.aget(test_filter) assert set([doc.metadata["code"] for doc in docs]) == set( expected_ids ), test_filter + async def test_vectorstore_get_limit_offset( + self, + vs_custom_filter: AsyncPGVectorStore, + ) -> None: + """Test limit and offset parameters of get method""" + + all_docs = await vs_custom_filter.aget() + docs_from_combining = (await vs_custom_filter.aget(limit=1)) + (await vs_custom_filter.aget(limit=1, offset=1)) + (await vs_custom_filter.aget(offset=2)) + + assert all_docs == docs_from_combining + async def test_asimilarity_hybrid_search(self, vs: AsyncPGVectorStore) -> None: results = await vs.asimilarity_search( "foo", k=1, hybrid_search_config=HybridSearchConfig() diff --git a/tests/unit_tests/v2/test_pg_vectorstore_search.py b/tests/unit_tests/v2/test_pg_vectorstore_search.py index 9697883..dc3ec34 100644 --- a/tests/unit_tests/v2/test_pg_vectorstore_search.py +++ b/tests/unit_tests/v2/test_pg_vectorstore_search.py @@ -438,10 +438,21 @@ def test_sync_vectorstore_get( ) -> None: """Test end to end construction and filter.""" - docs = vs_custom_filter_sync.get(k=5, filter=test_filter) + docs = vs_custom_filter_sync.get(filter=test_filter) assert set([doc.metadata["code"] for doc in docs]) == set( expected_ids ), test_filter + + def test_sync_vectorstore_get_limit_offset( + self, + vs_custom_filter_sync: PGVectorStore, + ) -> None: + """Test limit and offset parameters of get method""" + + all_docs = vs_custom_filter_sync.get() + docs_from_combining = vs_custom_filter_sync.get(limit=1) + vs_custom_filter_sync.get(limit=1, offset=1) + vs_custom_filter_sync.get(offset=2) + + assert all_docs == docs_from_combining @pytest.mark.parametrize("test_filter", NEGATIVE_TEST_CASES) def test_metadata_filter_negative_tests( From 90172606a9319f5ac526df0f812deecc5ca91a79 Mon Sep 17 00:00:00 2001 From: gRedHeadphone Date: Thu, 23 Oct 2025 10:16:03 +0000 Subject: [PATCH 07/10] chore: ruff format + uv lock version update --- langchain_postgres/v2/async_vectorstore.py | 2 +- langchain_postgres/v2/vectorstores.py | 4 +++- .../v2/test_async_pg_vectorstore_search.py | 12 ++++++++---- tests/unit_tests/v2/test_pg_vectorstore_search.py | 14 +++++++++----- uv.lock | 2 +- 5 files changed, 22 insertions(+), 12 deletions(-) diff --git a/langchain_postgres/v2/async_vectorstore.py b/langchain_postgres/v2/async_vectorstore.py index a860cfe..31aa8fa 100644 --- a/langchain_postgres/v2/async_vectorstore.py +++ b/langchain_postgres/v2/async_vectorstore.py @@ -1345,7 +1345,7 @@ def get( raise NotImplementedError( "Sync methods are not implemented for AsyncPGVectorStore. Use PGVectorStore interface instead." ) - + def get_by_ids(self, ids: Sequence[str]) -> list[Document]: raise NotImplementedError( "Sync methods are not implemented for AsyncPGVectorStore. Use PGVectorStore interface instead." diff --git a/langchain_postgres/v2/vectorstores.py b/langchain_postgres/v2/vectorstores.py index d08260a..0107c00 100644 --- a/langchain_postgres/v2/vectorstores.py +++ b/langchain_postgres/v2/vectorstores.py @@ -895,7 +895,9 @@ def get( **kwargs: Any, ) -> list[Document]: """Retrieve documents from the collection using filters and parameters.""" - return self._engine._run_as_sync(self.__vs.aget(filter=filter, limit=limit, offset=offset, **kwargs)) + return self._engine._run_as_sync( + self.__vs.aget(filter=filter, limit=limit, offset=offset, **kwargs) + ) def get_table_name(self) -> str: return self.__vs.table_name diff --git a/tests/unit_tests/v2/test_async_pg_vectorstore_search.py b/tests/unit_tests/v2/test_async_pg_vectorstore_search.py index 5a78dee..a252a72 100644 --- a/tests/unit_tests/v2/test_async_pg_vectorstore_search.py +++ b/tests/unit_tests/v2/test_async_pg_vectorstore_search.py @@ -379,9 +379,9 @@ async def test_vectorstore_get( ) -> None: """Test end to end construction and filter.""" docs = await vs_custom_filter.aget(test_filter) - assert set([doc.metadata["code"] for doc in docs]) == set( - expected_ids - ), test_filter + assert set([doc.metadata["code"] for doc in docs]) == set(expected_ids), ( + test_filter + ) async def test_vectorstore_get_limit_offset( self, @@ -390,7 +390,11 @@ async def test_vectorstore_get_limit_offset( """Test limit and offset parameters of get method""" all_docs = await vs_custom_filter.aget() - docs_from_combining = (await vs_custom_filter.aget(limit=1)) + (await vs_custom_filter.aget(limit=1, offset=1)) + (await vs_custom_filter.aget(offset=2)) + docs_from_combining = ( + (await vs_custom_filter.aget(limit=1)) + + (await vs_custom_filter.aget(limit=1, offset=1)) + + (await vs_custom_filter.aget(offset=2)) + ) assert all_docs == docs_from_combining diff --git a/tests/unit_tests/v2/test_pg_vectorstore_search.py b/tests/unit_tests/v2/test_pg_vectorstore_search.py index dc3ec34..40ccbd0 100644 --- a/tests/unit_tests/v2/test_pg_vectorstore_search.py +++ b/tests/unit_tests/v2/test_pg_vectorstore_search.py @@ -439,10 +439,10 @@ def test_sync_vectorstore_get( """Test end to end construction and filter.""" docs = vs_custom_filter_sync.get(filter=test_filter) - assert set([doc.metadata["code"] for doc in docs]) == set( - expected_ids - ), test_filter - + assert set([doc.metadata["code"] for doc in docs]) == set(expected_ids), ( + test_filter + ) + def test_sync_vectorstore_get_limit_offset( self, vs_custom_filter_sync: PGVectorStore, @@ -450,7 +450,11 @@ def test_sync_vectorstore_get_limit_offset( """Test limit and offset parameters of get method""" all_docs = vs_custom_filter_sync.get() - docs_from_combining = vs_custom_filter_sync.get(limit=1) + vs_custom_filter_sync.get(limit=1, offset=1) + vs_custom_filter_sync.get(offset=2) + docs_from_combining = ( + vs_custom_filter_sync.get(limit=1) + + vs_custom_filter_sync.get(limit=1, offset=1) + + vs_custom_filter_sync.get(offset=2) + ) assert all_docs == docs_from_combining diff --git a/uv.lock b/uv.lock index 933849d..28773e3 100644 --- a/uv.lock +++ b/uv.lock @@ -621,7 +621,7 @@ wheels = [ [[package]] name = "langchain-postgres" -version = "0.0.15" +version = "0.0.16" source = { editable = "." } dependencies = [ { name = "asyncpg" }, From 9cd202c4d24d3f3fbc4a23e17fa9ec5515d040c1 Mon Sep 17 00:00:00 2001 From: gRedHeadphone Date: Fri, 24 Oct 2025 05:45:48 +0000 Subject: [PATCH 08/10] docs: get method in vectorstore --- examples/pg_vectorstore.ipynb | 24 ++++++++++++++++++++++++ examples/pg_vectorstore_how_to.ipynb | 24 ++++++++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/examples/pg_vectorstore.ipynb b/examples/pg_vectorstore.ipynb index 2c20e90..53e4897 100644 --- a/examples/pg_vectorstore.ipynb +++ b/examples/pg_vectorstore.ipynb @@ -283,6 +283,30 @@ "await vectorstore.aadd_documents(documents=docs)" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Get documents\n", + "\n", + "Get documents from the vectorstore using filters and parameters." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "documents_with_apple = await vectorstore.aget({\"content\": {\"$ilike\": \"%apple%\"}})\n", + "first_three_documents = await vectorstore.aget(limit=3)\n", + "rest_of_documents = await vectorstore.aget(limit=5, offset=3)\n", + "\n", + "print([doc.page_content for doc in documents_with_apple])\n", + "print([doc.page_content for doc in first_three_documents])\n", + "print([doc.page_content for doc in rest_of_documents])" + ] + }, { "cell_type": "markdown", "metadata": {}, diff --git a/examples/pg_vectorstore_how_to.ipynb b/examples/pg_vectorstore_how_to.ipynb index 2c5e75a..9019e23 100644 --- a/examples/pg_vectorstore_how_to.ipynb +++ b/examples/pg_vectorstore_how_to.ipynb @@ -327,6 +327,30 @@ "await store.aadd_texts(all_texts, metadatas=metadatas, ids=ids)" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Get documents\n", + "\n", + "Get documents from the vectorstore using filters and parameters." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "documents_with_apple = await store.aget({\"content\": {\"$ilike\": \"%apple%\"}})\n", + "first_three_documents = await store.aget(limit=3)\n", + "rest_of_documents = await store.aget(limit=5, offset=3)\n", + "\n", + "print([doc.page_content for doc in documents_with_apple])\n", + "print([doc.page_content for doc in first_three_documents])\n", + "print([doc.page_content for doc in rest_of_documents])" + ] + }, { "cell_type": "markdown", "metadata": {}, From 147195000975b87cf7095dec14cc1bdc3169cfc3 Mon Sep 17 00:00:00 2001 From: gRedHeadphone Date: Tue, 4 Nov 2025 09:27:01 +0000 Subject: [PATCH 09/10] chore: removing index query options + using limit and offset as None itself --- langchain_postgres/v2/async_vectorstore.py | 24 +++++----------------- 1 file changed, 5 insertions(+), 19 deletions(-) diff --git a/langchain_postgres/v2/async_vectorstore.py b/langchain_postgres/v2/async_vectorstore.py index 31aa8fa..769b6dc 100644 --- a/langchain_postgres/v2/async_vectorstore.py +++ b/langchain_postgres/v2/async_vectorstore.py @@ -680,11 +680,7 @@ async def __query_collection_with_filter( filter: Optional[dict] = None, **kwargs: Any, ) -> Sequence[RowMapping]: - """ - Asynchronously query the database collection using filters and parameters and return matching rows.""" - - limit = limit if limit is not None else self.k - offset = offset if offset is not None else 0 + """Asynchronously query the database collection using filters and parameters and return matching rows.""" columns = [ self.id_column, @@ -709,20 +705,10 @@ async def __query_collection_with_filter( param_dict = {f"limit_{suffix_id}": limit, f"offset_{suffix_id}": offset} if filter_dict: param_dict.update(filter_dict) - if self.index_query_options: - async with self.engine.connect() as conn: - # Set each query option individually - for query_option in self.index_query_options.to_parameter(): - query_options_stmt = f"SET LOCAL {query_option};" - await conn.execute(text(query_options_stmt)) - result = await conn.execute(text(dense_query_stmt), param_dict) - result_map = result.mappings() - results = result_map.fetchall() - else: - async with self.engine.connect() as conn: - result = await conn.execute(text(dense_query_stmt), param_dict) - result_map = result.mappings() - results = result_map.fetchall() + async with self.engine.connect() as conn: + result = await conn.execute(text(dense_query_stmt), param_dict) + result_map = result.mappings() + results = result_map.fetchall() return results From e82e26a3877e08da2ea956ae6cb47526f43b4be1 Mon Sep 17 00:00:00 2001 From: gRedHeadphone Date: Wed, 12 Nov 2025 07:46:55 +0000 Subject: [PATCH 10/10] chore: get method of pg vectorstore exact same as chroma --- examples/pg_vectorstore.ipynb | 22 +++-- examples/pg_vectorstore_how_to.ipynb | 14 ++- langchain_postgres/v2/async_vectorstore.py | 87 ++++++++++++------- langchain_postgres/v2/vectorstores.py | 34 ++++++-- .../v2/test_async_pg_vectorstore_search.py | 34 ++++++-- .../v2/test_pg_vectorstore_search.py | 34 ++++++-- 6 files changed, 154 insertions(+), 71 deletions(-) diff --git a/examples/pg_vectorstore.ipynb b/examples/pg_vectorstore.ipynb index 53e4897..b5d26f3 100644 --- a/examples/pg_vectorstore.ipynb +++ b/examples/pg_vectorstore.ipynb @@ -258,22 +258,22 @@ "\n", "docs = [\n", " Document(\n", - " id=uuid.uuid4(),\n", + " id=str(uuid.uuid4()),\n", " page_content=\"there are cats in the pond\",\n", " metadata={\"likes\": 1, \"location\": \"pond\", \"topic\": \"animals\"},\n", " ),\n", " Document(\n", - " id=uuid.uuid4(),\n", + " id=str(uuid.uuid4()),\n", " page_content=\"ducks are also found in the pond\",\n", " metadata={\"likes\": 30, \"location\": \"pond\", \"topic\": \"animals\"},\n", " ),\n", " Document(\n", - " id=uuid.uuid4(),\n", + " id=str(uuid.uuid4()),\n", " page_content=\"fresh apples are available at the market\",\n", " metadata={\"likes\": 20, \"location\": \"market\", \"topic\": \"food\"},\n", " ),\n", " Document(\n", - " id=uuid.uuid4(),\n", + " id=str(uuid.uuid4()),\n", " page_content=\"the market also sells fresh oranges\",\n", " metadata={\"likes\": 5, \"location\": \"market\", \"topic\": \"food\"},\n", " ),\n", @@ -287,9 +287,9 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "### Get documents\n", + "## Get collection\n", "\n", - "Get documents from the vectorstore using filters and parameters." + "Get collection from the vectorstore using filters and parameters." ] }, { @@ -298,13 +298,11 @@ "metadata": {}, "outputs": [], "source": [ - "documents_with_apple = await vectorstore.aget({\"content\": {\"$ilike\": \"%apple%\"}})\n", - "first_three_documents = await vectorstore.aget(limit=3)\n", - "rest_of_documents = await vectorstore.aget(limit=5, offset=3)\n", + "documents_with_apple = await vectorstore.aget(where_document={\"$ilike\": \"%apple%\"}, include=\"documents\")\n", + "paginated_ids = await vectorstore.aget(limit=3, offset=3)\n", "\n", - "print([doc.page_content for doc in documents_with_apple])\n", - "print([doc.page_content for doc in first_three_documents])\n", - "print([doc.page_content for doc in rest_of_documents])" + "print(documents_with_apple[\"documents\"])\n", + "print(paginated_ids[\"ids\"])" ] }, { diff --git a/examples/pg_vectorstore_how_to.ipynb b/examples/pg_vectorstore_how_to.ipynb index 9019e23..e238fe9 100644 --- a/examples/pg_vectorstore_how_to.ipynb +++ b/examples/pg_vectorstore_how_to.ipynb @@ -331,9 +331,9 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "### Get documents\n", + "### Get collection\n", "\n", - "Get documents from the vectorstore using filters and parameters." + "Get collection from the vectorstore using filters and parameters." ] }, { @@ -342,13 +342,11 @@ "metadata": {}, "outputs": [], "source": [ - "documents_with_apple = await store.aget({\"content\": {\"$ilike\": \"%apple%\"}})\n", - "first_three_documents = await store.aget(limit=3)\n", - "rest_of_documents = await store.aget(limit=5, offset=3)\n", + "documents_with_apple = await store.aget(where_document={\"$ilike\": \"%apple%\"}, include=\"documents\")\n", + "paginated_ids = await store.aget(limit=3, offset=3)\n", "\n", - "print([doc.page_content for doc in documents_with_apple])\n", - "print([doc.page_content for doc in first_three_documents])\n", - "print([doc.page_content for doc in rest_of_documents])" + "print(documents_with_apple[\"documents\"])\n", + "print(paginated_ids[\"ids\"])" ] }, { diff --git a/langchain_postgres/v2/async_vectorstore.py b/langchain_postgres/v2/async_vectorstore.py index 769b6dc..73f78f8 100644 --- a/langchain_postgres/v2/async_vectorstore.py +++ b/langchain_postgres/v2/async_vectorstore.py @@ -678,18 +678,11 @@ async def __query_collection_with_filter( limit: Optional[int] = None, offset: Optional[int] = None, filter: Optional[dict] = None, + columns: Optional[list[str]] = None, **kwargs: Any, ) -> Sequence[RowMapping]: """Asynchronously query the database collection using filters and parameters and return matching rows.""" - columns = [ - self.id_column, - self.content_column, - self.embedding_column, - ] + self.metadata_columns - if self.metadata_json_column: - columns.append(self.metadata_json_column) - column_names = ", ".join(f'"{col}"' for col in columns) safe_filter = None @@ -1037,35 +1030,68 @@ async def is_valid_index( async def aget( self, - filter: Optional[dict] = None, + ids: Optional[Sequence[str]] = None, + where: Optional[dict] = None, limit: Optional[int] = None, offset: Optional[int] = None, + where_document: Optional[dict] = None, + include: Optional[list[str]] = None, **kwargs: Any, - ) -> list[Document]: + ) -> dict[str, Any]: """Retrieve documents from the collection using filters and parameters.""" + filter = {} + if ids: + filter.update({self.id_column: {"$in": ids}}) + if where: + filter.update(where) + if where_document: + filter.update({self.content_column: where_document}) + + if include is None: + include = ["metadatas", "documents"] + + fields_mapping = { + "embeddings": [self.embedding_column], + "metadatas": self.metadata_columns + [self.metadata_json_column] + if self.metadata_json_column + else self.metadata_columns, + "documents": [self.content_column], + } + + included_fields = ["ids"] + columns = [self.id_column] + + for field, cols in fields_mapping.items(): + if field in include: + included_fields.append(field) + columns.extend(cols) results = await self.__query_collection_with_filter( - limit=limit, offset=offset, filter=filter, **kwargs + limit=limit, offset=offset, filter=filter, columns=columns, **kwargs ) - documents = [] + final_results = {field: [] for field in included_fields} + for row in results: - metadata = ( - row[self.metadata_json_column] - if self.metadata_json_column and row[self.metadata_json_column] - else {} - ) - for col in self.metadata_columns: - metadata[col] = row[col] - documents.append( - Document( - page_content=row[self.content_column], - metadata=metadata, - id=str(row[self.id_column]), - ), - ) + final_results["ids"].append(str(row[self.id_column])) - return documents + if "metadatas" in final_results: + metadata = ( + row.get(self.metadata_json_column) or {} + if self.metadata_json_column + else {} + ) + for col in self.metadata_columns: + metadata[col] = row[col] + final_results["metadatas"].append(metadata) + + if "documents" in final_results: + final_results["documents"].append(row[self.content_column]) + + if "embeddings" in final_results: + final_results["embeddings"].append(row[self.embedding_column]) + + return final_results async def aget_by_ids(self, ids: Sequence[str]) -> list[Document]: """Get documents by ids.""" @@ -1323,11 +1349,14 @@ def _create_filter_clause(self, filters: Any) -> tuple[str, dict]: def get( self, - filter: Optional[dict] = None, + ids: Optional[Sequence[str]] = None, + where: Optional[dict] = None, limit: Optional[int] = None, offset: Optional[int] = None, + where_document: Optional[dict] = None, + include: Optional[list[str]] = None, **kwargs: Any, - ) -> list[Document]: + ) -> dict[str, Any]: raise NotImplementedError( "Sync methods are not implemented for AsyncPGVectorStore. Use PGVectorStore interface instead." ) diff --git a/langchain_postgres/v2/vectorstores.py b/langchain_postgres/v2/vectorstores.py index 0107c00..dd0c0c6 100644 --- a/langchain_postgres/v2/vectorstores.py +++ b/langchain_postgres/v2/vectorstores.py @@ -877,26 +877,48 @@ def get_by_ids(self, ids: Sequence[str]) -> list[Document]: async def aget( self, - filter: Optional[dict] = None, + ids: Optional[Sequence[str]] = None, + where: Optional[dict] = None, limit: Optional[int] = None, offset: Optional[int] = None, + where_document: Optional[dict] = None, + include: Optional[list[str]] = None, **kwargs: Any, - ) -> list[Document]: + ) -> dict[str, Any]: """Retrieve documents from the collection using filters and parameters.""" return await self._engine._run_as_async( - self.__vs.aget(filter=filter, limit=limit, offset=offset, **kwargs) + self.__vs.aget( + ids=ids, + where=where, + limit=limit, + offset=offset, + where_document=where_document, + include=include, + **kwargs, + ) ) def get( self, - filter: Optional[dict] = None, + ids: Optional[Sequence[str]] = None, + where: Optional[dict] = None, limit: Optional[int] = None, offset: Optional[int] = None, + where_document: Optional[dict] = None, + include: Optional[list[str]] = None, **kwargs: Any, - ) -> list[Document]: + ) -> dict[str, Any]: """Retrieve documents from the collection using filters and parameters.""" return self._engine._run_as_sync( - self.__vs.aget(filter=filter, limit=limit, offset=offset, **kwargs) + self.__vs.aget( + ids=ids, + where=where, + limit=limit, + offset=offset, + where_document=where_document, + include=include, + **kwargs, + ) ) def get_table_name(self) -> str: diff --git a/tests/unit_tests/v2/test_async_pg_vectorstore_search.py b/tests/unit_tests/v2/test_async_pg_vectorstore_search.py index a252a72..deefc77 100644 --- a/tests/unit_tests/v2/test_async_pg_vectorstore_search.py +++ b/tests/unit_tests/v2/test_async_pg_vectorstore_search.py @@ -370,6 +370,24 @@ async def test_vectorstore_with_metadata_filters( ) assert [doc.metadata["code"] for doc in docs] == expected_ids, test_filter + async def test_async_vectorstore_get_ids( + self, + vs_custom_filter: AsyncPGVectorStore + ) -> None: + """Test end to end construction and filter.""" + + res = await vs_custom_filter.aget(ids=ids[:2]) + assert set(res["ids"]) == set(ids[:2]) + + async def test_async_vectorstore_get_docs( + self, + vs_custom_filter: AsyncPGVectorStore + ) -> None: + """Test end to end construction and filter.""" + + res = await vs_custom_filter.aget(where_document={"$in": texts[:2]}) + assert set(res["documents"]) == set(texts[:2]) + @pytest.mark.parametrize("test_filter, expected_ids", FILTERING_TEST_CASES) async def test_vectorstore_get( self, @@ -378,8 +396,8 @@ async def test_vectorstore_get( expected_ids: list[str], ) -> None: """Test end to end construction and filter.""" - docs = await vs_custom_filter.aget(test_filter) - assert set([doc.metadata["code"] for doc in docs]) == set(expected_ids), ( + res = await vs_custom_filter.aget(where=test_filter) + assert set([r["code"] for r in res["metadatas"]]) == set(expected_ids), ( test_filter ) @@ -389,14 +407,14 @@ async def test_vectorstore_get_limit_offset( ) -> None: """Test limit and offset parameters of get method""" - all_docs = await vs_custom_filter.aget() - docs_from_combining = ( - (await vs_custom_filter.aget(limit=1)) - + (await vs_custom_filter.aget(limit=1, offset=1)) - + (await vs_custom_filter.aget(offset=2)) + all_ids = (await vs_custom_filter.aget())["ids"] + ids_from_combining = ( + (await vs_custom_filter.aget(limit=1))["ids"] + + (await vs_custom_filter.aget(limit=1, offset=1))["ids"] + + (await vs_custom_filter.aget(offset=2))["ids"] ) - assert all_docs == docs_from_combining + assert all_ids == ids_from_combining async def test_asimilarity_hybrid_search(self, vs: AsyncPGVectorStore) -> None: results = await vs.asimilarity_search( diff --git a/tests/unit_tests/v2/test_pg_vectorstore_search.py b/tests/unit_tests/v2/test_pg_vectorstore_search.py index 40ccbd0..a2dba46 100644 --- a/tests/unit_tests/v2/test_pg_vectorstore_search.py +++ b/tests/unit_tests/v2/test_pg_vectorstore_search.py @@ -429,6 +429,24 @@ def test_sync_vectorstore_with_metadata_filters( docs = vs_custom_filter_sync.similarity_search("meow", k=5, filter=test_filter) assert [doc.metadata["code"] for doc in docs] == expected_ids, test_filter + def test_sync_vectorstore_get_ids( + self, + vs_custom_filter_sync: PGVectorStore + ) -> None: + """Test end to end construction and filter.""" + + res = vs_custom_filter_sync.get(ids=ids[:2]) + assert set(res["ids"]) == set(ids[:2]) + + def test_sync_vectorstore_get_docs( + self, + vs_custom_filter_sync: PGVectorStore + ) -> None: + """Test end to end construction and filter.""" + + res = vs_custom_filter_sync.get(where_document={"$in": texts[:2]}) + assert set(res["documents"]) == set(texts[:2]) + @pytest.mark.parametrize("test_filter, expected_ids", FILTERING_TEST_CASES) def test_sync_vectorstore_get( self, @@ -438,8 +456,8 @@ def test_sync_vectorstore_get( ) -> None: """Test end to end construction and filter.""" - docs = vs_custom_filter_sync.get(filter=test_filter) - assert set([doc.metadata["code"] for doc in docs]) == set(expected_ids), ( + res = vs_custom_filter_sync.get(where=test_filter) + assert set([r["code"] for r in res["metadatas"]]) == set(expected_ids), ( test_filter ) @@ -449,14 +467,14 @@ def test_sync_vectorstore_get_limit_offset( ) -> None: """Test limit and offset parameters of get method""" - all_docs = vs_custom_filter_sync.get() - docs_from_combining = ( - vs_custom_filter_sync.get(limit=1) - + vs_custom_filter_sync.get(limit=1, offset=1) - + vs_custom_filter_sync.get(offset=2) + all_ids = vs_custom_filter_sync.get()["ids"] + ids_from_combining = ( + vs_custom_filter_sync.get(limit=1)["ids"] + + vs_custom_filter_sync.get(limit=1, offset=1)["ids"] + + vs_custom_filter_sync.get(offset=2)["ids"] ) - assert all_docs == docs_from_combining + assert all_ids == ids_from_combining @pytest.mark.parametrize("test_filter", NEGATIVE_TEST_CASES) def test_metadata_filter_negative_tests(