Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ source .venv/bin/activate

Install package in editable mode.
```shell
poetry install --with dev,test,lint
uv sync --group test
```

Start PostgreSQL/PGVector.
Expand All @@ -28,5 +28,6 @@ docker run --rm -it --name pgvector-container \

Invoke test cases.
```shell
export POSTGRES_PORT=6024
pytest -vvv
```
24 changes: 24 additions & 0 deletions examples/pg_vectorstore.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,30 @@
"await vectorstore.aadd_documents(documents=docs)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Get documents\n",
"\n",
"Get documents from the vectorstore using filters and parameters."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"documents_with_apple = await vectorstore.aget({\"content\": {\"$ilike\": \"%apple%\"}})\n",
"first_three_documents = await vectorstore.aget(limit=3)\n",
"rest_of_documents = await vectorstore.aget(limit=5, offset=3)\n",
"\n",
"print([doc.page_content for doc in documents_with_apple])\n",
"print([doc.page_content for doc in first_three_documents])\n",
"print([doc.page_content for doc in rest_of_documents])"
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand Down
24 changes: 24 additions & 0 deletions examples/pg_vectorstore_how_to.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,30 @@
"await store.aadd_texts(all_texts, metadatas=metadatas, ids=ids)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Get documents\n",
"\n",
"Get documents from the vectorstore using filters and parameters."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"documents_with_apple = await store.aget({\"content\": {\"$ilike\": \"%apple%\"}})\n",
"first_three_documents = await store.aget(limit=3)\n",
"rest_of_documents = await store.aget(limit=5, offset=3)\n",
"\n",
"print([doc.page_content for doc in documents_with_apple])\n",
"print([doc.page_content for doc in first_three_documents])\n",
"print([doc.page_content for doc in rest_of_documents])"
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand Down
97 changes: 97 additions & 0 deletions langchain_postgres/v2/async_vectorstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,60 @@ async def __query_collection(
return combined_results
return dense_results

async def __query_collection_with_filter(
self,
*,
limit: Optional[int] = None,
offset: Optional[int] = None,
filter: Optional[dict] = None,
**kwargs: Any,
) -> Sequence[RowMapping]:
"""
Asynchronously query the database collection using filters and parameters and return matching rows."""

limit = limit if limit is not None else self.k
offset = offset if offset is not None else 0

columns = [
self.id_column,
self.content_column,
self.embedding_column,
] + self.metadata_columns
if self.metadata_json_column:
columns.append(self.metadata_json_column)

column_names = ", ".join(f'"{col}"' for col in columns)

safe_filter = None
filter_dict = None
if filter and isinstance(filter, dict):
safe_filter, filter_dict = self._create_filter_clause(filter)

suffix_id = str(uuid.uuid4()).split("-")[0]
where_filters = f"WHERE {safe_filter}" if safe_filter else ""
dense_query_stmt = f"""SELECT {column_names}
FROM "{self.schema_name}"."{self.table_name}" {where_filters} LIMIT :limit_{suffix_id} OFFSET :offset_{suffix_id};
"""
param_dict = {f"limit_{suffix_id}": limit, f"offset_{suffix_id}": offset}
if filter_dict:
param_dict.update(filter_dict)
if self.index_query_options:
async with self.engine.connect() as conn:
# Set each query option individually
for query_option in self.index_query_options.to_parameter():
query_options_stmt = f"SET LOCAL {query_option};"
await conn.execute(text(query_options_stmt))
result = await conn.execute(text(dense_query_stmt), param_dict)
result_map = result.mappings()
results = result_map.fetchall()
else:
async with self.engine.connect() as conn:
result = await conn.execute(text(dense_query_stmt), param_dict)
result_map = result.mappings()
results = result_map.fetchall()

return results

async def asimilarity_search(
self,
query: str,
Expand Down Expand Up @@ -995,6 +1049,38 @@ async def is_valid_index(
results = result_map.fetchall()
return bool(len(results) == 1)

async def aget(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also support an ids input and just call the get_by_ids method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but then other parameters won't work. How about updating filters with ids filter?

self,
filter: Optional[dict] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We most likely should use "where" to match Chroma

limit: Optional[int] = None,
offset: Optional[int] = None,
**kwargs: Any,
) -> list[Document]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should return A dict with the keys "ids", "embeddings", "metadatas", "documents" to match the Chroma implementation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so should I also add "include" parameter?

"""Retrieve documents from the collection using filters and parameters."""

results = await self.__query_collection_with_filter(
limit=limit, offset=offset, filter=filter, **kwargs
)

documents = []
for row in results:
metadata = (
row[self.metadata_json_column]
if self.metadata_json_column and row[self.metadata_json_column]
else {}
)
for col in self.metadata_columns:
metadata[col] = row[col]
documents.append(
Document(
page_content=row[self.content_column],
metadata=metadata,
id=str(row[self.id_column]),
),
)

return documents

async def aget_by_ids(self, ids: Sequence[str]) -> list[Document]:
"""Get documents by ids."""

Expand Down Expand Up @@ -1249,6 +1335,17 @@ def _create_filter_clause(self, filters: Any) -> tuple[str, dict]:
else:
return "", {}

def get(
self,
filter: Optional[dict] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
**kwargs: Any,
) -> list[Document]:
raise NotImplementedError(
"Sync methods are not implemented for AsyncPGVectorStore. Use PGVectorStore interface instead."
)

def get_by_ids(self, ids: Sequence[str]) -> list[Document]:
raise NotImplementedError(
"Sync methods are not implemented for AsyncPGVectorStore. Use PGVectorStore interface instead."
Expand Down
24 changes: 24 additions & 0 deletions langchain_postgres/v2/vectorstores.py
Original file line number Diff line number Diff line change
Expand Up @@ -875,5 +875,29 @@ def get_by_ids(self, ids: Sequence[str]) -> list[Document]:
"""Get documents by ids."""
return self._engine._run_as_sync(self.__vs.aget_by_ids(ids=ids))

async def aget(
self,
filter: Optional[dict] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
**kwargs: Any,
) -> list[Document]:
"""Retrieve documents from the collection using filters and parameters."""
return await self._engine._run_as_async(
self.__vs.aget(filter=filter, limit=limit, offset=offset, **kwargs)
)

def get(
self,
filter: Optional[dict] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
**kwargs: Any,
) -> list[Document]:
"""Retrieve documents from the collection using filters and parameters."""
return self._engine._run_as_sync(
self.__vs.aget(filter=filter, limit=limit, offset=offset, **kwargs)
)

def get_table_name(self) -> str:
return self.__vs.table_name
28 changes: 28 additions & 0 deletions tests/unit_tests/v2/test_async_pg_vectorstore_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,34 @@ async def test_vectorstore_with_metadata_filters(
)
assert [doc.metadata["code"] for doc in docs] == expected_ids, test_filter

@pytest.mark.parametrize("test_filter, expected_ids", FILTERING_TEST_CASES)
async def test_vectorstore_get(
self,
vs_custom_filter: AsyncPGVectorStore,
test_filter: dict,
expected_ids: list[str],
) -> None:
"""Test end to end construction and filter."""
docs = await vs_custom_filter.aget(test_filter)
assert set([doc.metadata["code"] for doc in docs]) == set(expected_ids), (
test_filter
)

async def test_vectorstore_get_limit_offset(
self,
vs_custom_filter: AsyncPGVectorStore,
) -> None:
"""Test limit and offset parameters of get method"""

all_docs = await vs_custom_filter.aget()
docs_from_combining = (
(await vs_custom_filter.aget(limit=1))
+ (await vs_custom_filter.aget(limit=1, offset=1))
+ (await vs_custom_filter.aget(offset=2))
)

assert all_docs == docs_from_combining

async def test_asimilarity_hybrid_search(self, vs: AsyncPGVectorStore) -> None:
results = await vs.asimilarity_search(
"foo", k=1, hybrid_search_config=HybridSearchConfig()
Expand Down
29 changes: 29 additions & 0 deletions tests/unit_tests/v2/test_pg_vectorstore_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,35 @@ def test_sync_vectorstore_with_metadata_filters(
docs = vs_custom_filter_sync.similarity_search("meow", k=5, filter=test_filter)
assert [doc.metadata["code"] for doc in docs] == expected_ids, test_filter

@pytest.mark.parametrize("test_filter, expected_ids", FILTERING_TEST_CASES)
def test_sync_vectorstore_get(
self,
vs_custom_filter_sync: PGVectorStore,
test_filter: dict,
expected_ids: list[str],
) -> None:
"""Test end to end construction and filter."""

docs = vs_custom_filter_sync.get(filter=test_filter)
assert set([doc.metadata["code"] for doc in docs]) == set(expected_ids), (
test_filter
)

def test_sync_vectorstore_get_limit_offset(
self,
vs_custom_filter_sync: PGVectorStore,
) -> None:
"""Test limit and offset parameters of get method"""

all_docs = vs_custom_filter_sync.get()
docs_from_combining = (
vs_custom_filter_sync.get(limit=1)
+ vs_custom_filter_sync.get(limit=1, offset=1)
+ vs_custom_filter_sync.get(offset=2)
)

assert all_docs == docs_from_combining

@pytest.mark.parametrize("test_filter", NEGATIVE_TEST_CASES)
def test_metadata_filter_negative_tests(
self, vs_custom_filter_sync: PGVectorStore, test_filter: dict
Expand Down
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.