Skip to content

Commit daff6b2

Browse files
committed
Use values clause in MERGE and add more comments
1 parent 5a48743 commit daff6b2

File tree

1 file changed

+95
-83
lines changed
  • libs/oracledb/langchain_oracledb/vectorstores

1 file changed

+95
-83
lines changed

libs/oracledb/langchain_oracledb/vectorstores/oraclevs.py

Lines changed: 95 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1221,15 +1221,19 @@ def _read_similarity_output(
12211221
return docs
12221222

12231223

1224+
# SQL queries to insert data into tables.
1225+
# INSERT_QUERY is used when we do not wish to update the row when there is duplicate id.
1226+
# MERGE_QUERY is used when we wish to update the row when there is duplicate id.
1227+
# both expect values in the order (id, embedding, metadata, text)
1228+
12241229
INSERT_QUERY = (
12251230
"INSERT INTO {table_name} (id, embedding, metadata, text) VALUES ({values})"
12261231
)
12271232
MERGE_QUERY = """
12281233
MERGE INTO {table_name} t
12291234
USING (
1230-
SELECT {using}
1231-
FROM dual
1232-
) s
1235+
VALUES ({values})
1236+
) s(id, embedding, metadata, text)
12331237
ON (t.id = s.id)
12341238
WHEN MATCHED THEN
12351239
UPDATE SET
@@ -1471,12 +1475,25 @@ def add_texts(
14711475
**kwargs: Any,
14721476
) -> list[str]:
14731477
"""Add more texts to the vectorstore index.
1478+
1479+
Duplicate id handling behavior is controlled by the `mutate_on_duplicate`
1480+
parameter passed when creating an `OracleVS` instance:
1481+
If False: Existing rows with the same id are left unchanged;
1482+
duplicate rows are skipped and their ids are not returned
1483+
(i.e., they are not reported as successfully inserted).
1484+
If True: Existing rows with the same id are updated (upsert behavior);
1485+
returned ids include successful inserts and updates.
1486+
14741487
Args:
14751488
texts: Iterable of strings to add to the vectorstore.
14761489
metadatas: Optional list of metadatas associated with the texts.
14771490
ids: Optional list of ids for the texts that are being added to
14781491
the vector store.
14791492
kwargs: vectorstore specific parameters
1493+
1494+
Returns:
1495+
List[str]: The ids successfully inserted (and, when mutate_on_duplicate=True,
1496+
also those successfully updated).
14801497
"""
14811498

14821499
texts = list(texts)
@@ -1493,6 +1510,8 @@ def add_texts(
14931510
)
14941511
metadatas[i][INTERNAL_ID_KEY] = _id
14951512

1513+
# with OracleEmbeddings, embeddings are generated in the database during insert;
1514+
# they are not sent back to Python to be written again.
14961515
docs: Any
14971516
if not isinstance(self.embeddings, OracleEmbeddings):
14981517
embeddings = self._embed_documents(texts)
@@ -1519,27 +1538,34 @@ def add_texts(
15191538
if connection is None:
15201539
raise ValueError("Failed to acquire a connection.")
15211540
with connection.cursor() as cursor:
1541+
# self.mutate_on_duplicate controls how inserts handle existing IDs.
1542+
# If False:
1543+
# uses INSERT_QUERY.
1544+
# existing rows having the same ID as the inserted row are not updated.
1545+
# with batcherrors=True, duplicate rows are skipped and their IDs
1546+
# are not included in the `add_texts` return value (i.e., not
1547+
# reported as successfully inserted).
1548+
#
1549+
# If True:
1550+
# uses MERGE_QUERY.
1551+
# existing rows having the same ID as the inserted row are updated
1552+
# with the new data ("upsert").
1553+
# the ID is included in the `add_texts` return value,
1554+
# indicating a successful insert/update.
1555+
selected_query = (
1556+
INSERT_QUERY if not self.mutate_on_duplicate else MERGE_QUERY
1557+
)
15221558
if not isinstance(self.embeddings, OracleEmbeddings):
15231559
cursor.setinputsizes(
15241560
None, oracledb.DB_TYPE_VECTOR, oracledb.DB_TYPE_JSON, None
15251561
)
1526-
if not self.mutate_on_duplicate:
1527-
cursor.executemany(
1528-
INSERT_QUERY.format(
1529-
table_name=self.table_name, values=":1, :2, :3, :4"
1530-
),
1531-
docs,
1532-
batcherrors=True,
1533-
)
1534-
else:
1535-
cursor.executemany(
1536-
MERGE_QUERY.format(
1537-
table_name=self.table_name,
1538-
using=":1 id, :2 embedding, :3 metadata, :4 text",
1539-
),
1540-
docs,
1541-
batcherrors=True,
1542-
)
1562+
cursor.executemany(
1563+
selected_query.format(
1564+
table_name=self.table_name, values=":1, :2, :3, :4"
1565+
),
1566+
docs,
1567+
batcherrors=True,
1568+
)
15431569

15441570
else:
15451571
if self.embeddings.proxy:
@@ -1552,30 +1578,18 @@ def add_texts(
15521578
meta=oracledb.DB_TYPE_JSON, param=oracledb.DB_TYPE_JSON
15531579
)
15541580

1555-
if not self.mutate_on_duplicate:
1556-
cursor.executemany(
1557-
INSERT_QUERY.format(
1558-
table_name=self.table_name,
1559-
values=(
1560-
":id, dbms_vector_chain.utl_to_embedding(:text,json(:param)), " # noqa: E501
1561-
":meta, :text"
1562-
),
1563-
),
1564-
docs,
1565-
batcherrors=True,
1566-
)
1567-
else:
1568-
cursor.executemany(
1569-
MERGE_QUERY.format(
1570-
table_name=self.table_name,
1571-
using=(
1572-
":id id, :meta metadata, :text text, "
1573-
"dbms_vector_chain.utl_to_embedding(:text, json(:param)) embedding" # noqa: E501
1574-
),
1581+
cursor.executemany(
1582+
selected_query.format(
1583+
table_name=self.table_name,
1584+
values=(
1585+
":id, dbms_vector_chain.utl_to_embedding(:text,json(:param)), " # noqa: E501
1586+
":meta, :text"
15751587
),
1576-
docs,
1577-
batcherrors=True,
1578-
)
1588+
),
1589+
docs,
1590+
batcherrors=True,
1591+
)
1592+
15791593
error_indices = [error.offset for error in cursor.getbatcherrors()]
15801594
connection.commit()
15811595

@@ -1596,12 +1610,25 @@ async def aadd_texts(
15961610
**kwargs: Any,
15971611
) -> list[str]:
15981612
"""Add more texts to the vectorstore index, async.
1613+
1614+
Duplicate id handling behavior is controlled by the `mutate_on_duplicate`
1615+
parameter passed when creating an `OracleVS` instance:
1616+
If False: Existing rows with the same id are left unchanged;
1617+
duplicate rows are skipped and their ids are not returned
1618+
(i.e., they are not reported as successfully inserted).
1619+
If True: Existing rows with the same id are updated (upsert behavior);
1620+
returned ids include successful inserts and updates.
1621+
15991622
Args:
16001623
texts: Iterable of strings to add to the vectorstore.
16011624
metadatas: Optional list of metadatas associated with the texts.
16021625
ids: Optional list of ids for the texts that are being added to
16031626
the vector store.
16041627
kwargs: vectorstore specific parameters
1628+
1629+
Returns:
1630+
List[str]: The ids successfully inserted (and, when mutate_on_duplicate=True,
1631+
also those successfully updated).
16051632
"""
16061633

16071634
texts = list(texts)
@@ -1618,6 +1645,8 @@ async def aadd_texts(
16181645
)
16191646
metadatas[i][INTERNAL_ID_KEY] = _id
16201647

1648+
# with OracleEmbeddings, embeddings are generated in the database during insert;
1649+
# they are not sent back to Python to be written again.
16211650
docs: Any
16221651
if not isinstance(self.embeddings, OracleEmbeddings):
16231652
embeddings = await self._aembed_documents(texts)
@@ -1644,27 +1673,22 @@ async def context(connection: Any) -> List[str]:
16441673
if connection is None:
16451674
raise ValueError("Failed to acquire a connection.")
16461675
with connection.cursor() as cursor:
1676+
# self.mutate_on_duplicate controls how inserts handle existing IDs,
1677+
# behavior is identical to the synchronous `add_texts` method.
1678+
selected_query = (
1679+
INSERT_QUERY if not self.mutate_on_duplicate else MERGE_QUERY
1680+
)
16471681
if not isinstance(self.embeddings, OracleEmbeddings):
16481682
cursor.setinputsizes(
16491683
None, oracledb.DB_TYPE_VECTOR, oracledb.DB_TYPE_JSON, None
16501684
)
1651-
if not self.mutate_on_duplicate:
1652-
await cursor.executemany(
1653-
INSERT_QUERY.format(
1654-
table_name=self.table_name, values=":1, :2, :3, :4"
1655-
),
1656-
docs,
1657-
batcherrors=True,
1658-
)
1659-
else:
1660-
await cursor.executemany(
1661-
MERGE_QUERY.format(
1662-
table_name=self.table_name,
1663-
using=":1 id, :2 embedding, :3 metadata, :4 text",
1664-
),
1665-
docs,
1666-
batcherrors=True,
1667-
)
1685+
await cursor.executemany(
1686+
selected_query.format(
1687+
table_name=self.table_name, values=":1, :2, :3, :4"
1688+
),
1689+
docs,
1690+
batcherrors=True,
1691+
)
16681692
else:
16691693
if self.embeddings.proxy:
16701694
await cursor.execute(
@@ -1675,30 +1699,18 @@ async def context(connection: Any) -> List[str]:
16751699
cursor.setinputsizes(
16761700
meta=oracledb.DB_TYPE_JSON, param=oracledb.DB_TYPE_JSON
16771701
)
1678-
if not self.mutate_on_duplicate:
1679-
await cursor.executemany(
1680-
INSERT_QUERY.format(
1681-
table_name=self.table_name,
1682-
values=(
1683-
":id, dbms_vector_chain.utl_to_embedding(:text,json(:param)), " # noqa: E501
1684-
":meta, :text"
1685-
),
1686-
),
1687-
docs,
1688-
batcherrors=True,
1689-
)
1690-
else:
1691-
await cursor.executemany(
1692-
MERGE_QUERY.format(
1693-
table_name=self.table_name,
1694-
using=(
1695-
":id id, :meta metadata, :text text, "
1696-
"dbms_vector_chain.utl_to_embedding(:text, json(:param)) embedding" # noqa: E501
1697-
),
1702+
1703+
await cursor.executemany(
1704+
selected_query.format(
1705+
table_name=self.table_name,
1706+
values=(
1707+
":id, dbms_vector_chain.utl_to_embedding(:text,json(:param)), " # noqa: E501
1708+
":meta, :text"
16981709
),
1699-
docs,
1700-
batcherrors=True,
1701-
)
1710+
),
1711+
docs,
1712+
batcherrors=True,
1713+
)
17021714

17031715
error_indices = [error.offset for error in cursor.getbatcherrors()]
17041716
await connection.commit()

0 commit comments

Comments
 (0)