From 09c3a74c1d3ca665707794e9ae41610d90656805 Mon Sep 17 00:00:00 2001 From: Vits Date: Tue, 16 Jul 2024 22:47:56 +0200 Subject: [PATCH 1/6] Fixed --- semantic_router/index/pinecone.py | 56 ++++++++++++++++++++++++++++--- semantic_router/layer.py | 24 ++++++++++++- 2 files changed, 75 insertions(+), 5 deletions(-) diff --git a/semantic_router/index/pinecone.py b/semantic_router/index/pinecone.py index a578eb01..64312b3f 100644 --- a/semantic_router/index/pinecone.py +++ b/semantic_router/index/pinecone.py @@ -11,6 +11,7 @@ from semantic_router.index.base import BaseIndex from semantic_router.utils.logger import logger +from semantic_router.route import Route def clean_route_name(route_name: str) -> str: @@ -203,6 +204,28 @@ async def _init_async_index(self, force_create: bool = False): def _sync_index(self, local_routes: dict): remote_routes = self.get_routes() + if not local_routes["routes"]: + if self.sync != "remote": + raise ValueError( + "Local routes must be provided to sync the index if the sync setting is not 'remote'." + ) + else: + if not remote_routes: + raise ValueError("No routes found in the index.") + if ( + (self.sync in ["remote", "merge-force-remote"] and not remote_routes) + or ( + self.sync in ["error", "local", "merge-force-local"] + and not local_routes["routes"] + ) + or ( + self.sync == "merge" + and not remote_routes + and not local_routes["routes"] + ) + ): + raise ValueError("No routes found in the index.") + remote_dict: dict = {route: set() for route, _ in remote_routes} for route, utterance in remote_routes: remote_dict[route].add(utterance) @@ -215,6 +238,7 @@ def _sync_index(self, local_routes: dict): routes_to_add = [] routes_to_delete = [] + layer_routes = {} for route in all_routes: local_utterances = local_dict.get(route, set()) @@ -226,8 +250,11 @@ def _sync_index(self, local_routes: dict): f"Synchronization error: Differences found in route '{route}'" ) utterances_to_include: set = set() + layer_routes[route] = list(local_utterances) elif self.sync == "remote": utterances_to_include = set() + if remote_utterances: + layer_routes[route] = list(remote_utterances) elif self.sync == "local": utterances_to_include = local_utterances - remote_utterances routes_to_delete.extend( @@ -237,11 +264,16 @@ def _sync_index(self, local_routes: dict): if utterance not in local_utterances ] ) + layer_routes[route] = list(local_utterances) elif self.sync == "merge-force-remote": if route in local_dict and route not in remote_dict: utterances_to_include = local_utterances + if local_utterances: + layer_routes[route] = list(local_utterances) else: utterances_to_include = set() + if remote_utterances: + layer_routes[route] = list(remote_utterances) elif self.sync == "merge-force-local": if route in local_dict: utterances_to_include = local_utterances - remote_utterances @@ -252,10 +284,15 @@ def _sync_index(self, local_routes: dict): if utterance not in local_utterances ] ) + if local_utterances: + layer_routes[route] = local_utterances else: utterances_to_include = set() + if remote_utterances: + layer_routes[route] = list(remote_utterances) elif self.sync == "merge": utterances_to_include = local_utterances - remote_utterances + layer_routes[route] = list(remote_utterances.union(local_utterances)) else: raise ValueError("Invalid sync mode specified") @@ -272,7 +309,7 @@ def _sync_index(self, local_routes: dict): ] ) - return routes_to_add, routes_to_delete + return routes_to_add, routes_to_delete, layer_routes def _batch_upsert(self, batch: List[Dict]): """Helper method for upserting a single batch of records.""" @@ -308,8 +345,8 @@ def _add_and_sync( routes: List[str], utterances: List[str], batch_size: int = 100, - ): - """Add vectors to Pinecone in batches.""" + ) -> List[Route]: + """Add vectors to Pinecone in batches and return the overall updated list of Route objects.""" if self.index is None: self.dimensions = self.dimensions or len(embeddings[0]) self.index = self._init_index(force_create=True) @@ -320,7 +357,15 @@ def _add_and_sync( "embeddings": embeddings, } if self.sync is not None: - data_to_upsert, data_to_delete = self._sync_index(local_routes=local_routes) + data_to_upsert, data_to_delete, layer_routes_dict = self._sync_index( + local_routes=local_routes + ) + + layer_routes = [ + Route(name=route, utterances=layer_routes_dict[route]) + for route in layer_routes_dict.keys() + ] + routes_to_delete: dict = {} for route, utterance in data_to_delete: routes_to_delete.setdefault(route, []).append(utterance) @@ -335,6 +380,7 @@ def _add_and_sync( ] if ids_to_delete and self.index: self.index.delete(ids=ids_to_delete) + else: data_to_upsert = [ (vector, route, utterance) @@ -350,6 +396,8 @@ def _add_and_sync( batch = vectors_to_upsert[i : i + batch_size] self._batch_upsert(batch) + return layer_routes + def _get_route_ids(self, route_name: str): clean_route = clean_route_name(route_name) ids, _ = self._get_all(prefix=f"{clean_route}#") diff --git a/semantic_router/layer.py b/semantic_router/layer.py index 5c2d7228..20a325b7 100644 --- a/semantic_router/layer.py +++ b/semantic_router/layer.py @@ -220,6 +220,19 @@ def __init__( if len(self.routes) > 0: # initialize index now self._add_routes(routes=self.routes) + elif self.index.sync in ["merge", "remote", "merge-force-remote"]: + dummy_embedding = self.encoder(["dummy"]) + + layer_routes = self.index._add_and_sync( + embeddings=dummy_embedding, + routes=[], + utterances=[], + ) + self._set_layer_routes(layer_routes) + else: + raise ValueError( + "No routes provided for RouteLayer. Please provide routes or set sync to 'remote' if you want to use only remote routes." + ) def check_for_matching_routes(self, top_class: str) -> Optional[Route]: matching_routes = [route for route in self.routes if route.name == top_class] @@ -380,6 +393,14 @@ def _check_threshold(self, scores: List[float], route: Optional[Route]) -> bool: ) return self._pass_threshold(scores, threshold) + def _set_layer_routes(self, new_routes: List[Route]): + """ + Set and override the current routes with a new list of routes. + + :param new_routes: List of Route objects to set as the current routes. + """ + self.routes = new_routes + def __str__(self): return ( f"RouteLayer(encoder={self.encoder}, " @@ -466,11 +487,12 @@ def _add_routes(self, routes: List[Route]): # create route array route_names = [route.name for route in routes for _ in route.utterances] # add everything to the index - self.index._add_and_sync( + layer_routes = self.index._add_and_sync( embeddings=embedded_utterances, routes=route_names, utterances=all_utterances, ) + self._set_layer_routes(layer_routes) def _encode(self, text: str) -> Any: """Given some text, encode it.""" From 78a4bef1968dd28ba2fc435704905a9743f920f6 Mon Sep 17 00:00:00 2001 From: Vits Date: Thu, 18 Jul 2024 00:40:33 +0200 Subject: [PATCH 2/6] Fixing PyTest errors --- semantic_router/index/pinecone.py | 33 +++++++++---------------------- semantic_router/layer.py | 6 +----- 2 files changed, 10 insertions(+), 29 deletions(-) diff --git a/semantic_router/index/pinecone.py b/semantic_router/index/pinecone.py index 64312b3f..f4fa0b80 100644 --- a/semantic_router/index/pinecone.py +++ b/semantic_router/index/pinecone.py @@ -204,27 +204,6 @@ async def _init_async_index(self, force_create: bool = False): def _sync_index(self, local_routes: dict): remote_routes = self.get_routes() - if not local_routes["routes"]: - if self.sync != "remote": - raise ValueError( - "Local routes must be provided to sync the index if the sync setting is not 'remote'." - ) - else: - if not remote_routes: - raise ValueError("No routes found in the index.") - if ( - (self.sync in ["remote", "merge-force-remote"] and not remote_routes) - or ( - self.sync in ["error", "local", "merge-force-local"] - and not local_routes["routes"] - ) - or ( - self.sync == "merge" - and not remote_routes - and not local_routes["routes"] - ) - ): - raise ValueError("No routes found in the index.") remote_dict: dict = {route: set() for route, _ in remote_routes} for route, utterance in remote_routes: @@ -244,13 +223,17 @@ def _sync_index(self, local_routes: dict): local_utterances = local_dict.get(route, set()) remote_utterances = remote_dict.get(route, set()) + if not local_utterances and not remote_utterances: + continue + if self.sync == "error": if local_utterances != remote_utterances: raise ValueError( f"Synchronization error: Differences found in route '{route}'" ) utterances_to_include: set = set() - layer_routes[route] = list(local_utterances) + if local_utterances: + layer_routes[route] = list(local_utterances) elif self.sync == "remote": utterances_to_include = set() if remote_utterances: @@ -264,7 +247,8 @@ def _sync_index(self, local_routes: dict): if utterance not in local_utterances ] ) - layer_routes[route] = list(local_utterances) + if local_utterances: + layer_routes[route] = list(local_utterances) elif self.sync == "merge-force-remote": if route in local_dict and route not in remote_dict: utterances_to_include = local_utterances @@ -292,7 +276,8 @@ def _sync_index(self, local_routes: dict): layer_routes[route] = list(remote_utterances) elif self.sync == "merge": utterances_to_include = local_utterances - remote_utterances - layer_routes[route] = list(remote_utterances.union(local_utterances)) + if local_utterances or remote_utterances: + layer_routes[route] = list(remote_utterances.union(local_utterances)) else: raise ValueError("Invalid sync mode specified") diff --git a/semantic_router/layer.py b/semantic_router/layer.py index 20a325b7..f4042b6c 100644 --- a/semantic_router/layer.py +++ b/semantic_router/layer.py @@ -220,7 +220,7 @@ def __init__( if len(self.routes) > 0: # initialize index now self._add_routes(routes=self.routes) - elif self.index.sync in ["merge", "remote", "merge-force-remote"]: + elif self.index.sync: dummy_embedding = self.encoder(["dummy"]) layer_routes = self.index._add_and_sync( @@ -229,10 +229,6 @@ def __init__( utterances=[], ) self._set_layer_routes(layer_routes) - else: - raise ValueError( - "No routes provided for RouteLayer. Please provide routes or set sync to 'remote' if you want to use only remote routes." - ) def check_for_matching_routes(self, top_class: str) -> Optional[Route]: matching_routes = [route for route in self.routes if route.name == top_class] From c5dd5dd2e0381e37248d6e0bc3c0094cf61d7082 Mon Sep 17 00:00:00 2001 From: Vits Date: Thu, 18 Jul 2024 01:20:28 +0200 Subject: [PATCH 3/6] Fixed pytests --- semantic_router/layer.py | 38 ++++++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/semantic_router/layer.py b/semantic_router/layer.py index f4042b6c..5852b8db 100644 --- a/semantic_router/layer.py +++ b/semantic_router/layer.py @@ -217,18 +217,21 @@ def __init__( if route.score_threshold is None: route.score_threshold = self.score_threshold # if routes list has been passed, we initialize index now - if len(self.routes) > 0: + if self.index.sync: # initialize index now - self._add_routes(routes=self.routes) - elif self.index.sync: - dummy_embedding = self.encoder(["dummy"]) + if len(self.routes) > 0: + self._add_and_sync_routes(routes=self.routes) + else: + dummy_embedding = self.encoder(["dummy"]) - layer_routes = self.index._add_and_sync( - embeddings=dummy_embedding, - routes=[], - utterances=[], - ) - self._set_layer_routes(layer_routes) + layer_routes = self.index._add_and_sync( + embeddings=dummy_embedding, + routes=[], + utterances=[], + ) + self._set_layer_routes(layer_routes) + elif len(self.routes) > 0: + self._add_routes(routes=self.routes) def check_for_matching_routes(self, top_class: str) -> Optional[Route]: matching_routes = [route for route in self.routes if route.name == top_class] @@ -483,6 +486,21 @@ def _add_routes(self, routes: List[Route]): # create route array route_names = [route.name for route in routes for _ in route.utterances] # add everything to the index + self.index.add( + embeddings=embedded_utterances, + routes=route_names, + utterances=all_utterances, + ) + + def _add_and_sync_routes(self, routes: List[Route]): + # create embeddings for all routes and sync at startup with remote ones based on sync setting + all_utterances = [ + utterance for route in routes for utterance in route.utterances + ] + embedded_utterances = self.encoder(all_utterances) + # create route array + route_names = [route.name for route in routes for _ in route.utterances] + # add everything to the index layer_routes = self.index._add_and_sync( embeddings=embedded_utterances, routes=route_names, From ca531ea0e669b286263c515565e6d6f8e39577ac Mon Sep 17 00:00:00 2001 From: Vits Date: Thu, 18 Jul 2024 01:22:56 +0200 Subject: [PATCH 4/6] Formatting and linting --- semantic_router/index/pinecone.py | 4 +++- semantic_router/layer.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/semantic_router/index/pinecone.py b/semantic_router/index/pinecone.py index f4fa0b80..858d56c9 100644 --- a/semantic_router/index/pinecone.py +++ b/semantic_router/index/pinecone.py @@ -277,7 +277,9 @@ def _sync_index(self, local_routes: dict): elif self.sync == "merge": utterances_to_include = local_utterances - remote_utterances if local_utterances or remote_utterances: - layer_routes[route] = list(remote_utterances.union(local_utterances)) + layer_routes[route] = list( + remote_utterances.union(local_utterances) + ) else: raise ValueError("Invalid sync mode specified") diff --git a/semantic_router/layer.py b/semantic_router/layer.py index 5852b8db..61824033 100644 --- a/semantic_router/layer.py +++ b/semantic_router/layer.py @@ -491,7 +491,7 @@ def _add_routes(self, routes: List[Route]): routes=route_names, utterances=all_utterances, ) - + def _add_and_sync_routes(self, routes: List[Route]): # create embeddings for all routes and sync at startup with remote ones based on sync setting all_utterances = [ From 7597bc355d8aedc38a8a41c6177f889cc9be8a7b Mon Sep 17 00:00:00 2001 From: Vits Date: Tue, 30 Jul 2024 18:19:57 +0200 Subject: [PATCH 5/6] Implemented/Modified remove and sync methods for index classes --- semantic_router/index/base.py | 13 ++--- semantic_router/index/local.py | 14 ++--- semantic_router/index/pinecone.py | 96 ++++++++----------------------- semantic_router/index/qdrant.py | 15 ++--- semantic_router/layer.py | 59 ++++++++++++------- 5 files changed, 77 insertions(+), 120 deletions(-) diff --git a/semantic_router/index/base.py b/semantic_router/index/base.py index 76388d1d..9eb99532 100644 --- a/semantic_router/index/base.py +++ b/semantic_router/index/base.py @@ -32,15 +32,10 @@ def add( This method should be implemented by subclasses. """ raise NotImplementedError("This method should be implemented by subclasses.") - - def _add_and_sync( - self, - embeddings: List[List[float]], - routes: List[str], - utterances: List[Any], - ): + + def _remove_and_sync(self, routes_to_delete: dict): """ - Add embeddings to the index and manage index syncing if necessary. + Remove embeddings in a routes syncing process from the index. This method should be implemented by subclasses. """ raise NotImplementedError("This method should be implemented by subclasses.") @@ -91,7 +86,7 @@ def delete_index(self): """ raise NotImplementedError("This method should be implemented by subclasses.") - def _sync_index(self, local_routes: dict): + def _sync_index(self, local_route_names: List[str], local_utterances: List[str], dimensions: int): """ Synchronize the local index with the remote index based on the specified mode. Modes: diff --git a/semantic_router/index/local.py b/semantic_router/index/local.py index 5426ec76..dbc41f1a 100644 --- a/semantic_router/index/local.py +++ b/semantic_router/index/local.py @@ -42,15 +42,13 @@ def add( self.routes = np.concatenate([self.routes, routes_arr]) self.utterances = np.concatenate([self.utterances, utterances_arr]) - def _add_and_sync( - self, - embeddings: List[List[float]], - routes: List[str], - utterances: List[str], - ): + def _remove_and_sync(self, routes_to_delete: dict): + if self.sync is not None: + logger.warning("Sync remove is not implemented for LocalIndex.") + + def _sync_index(self, local_route_names: List[str], local_utterances: List[str], dimensions: int): if self.sync is not None: - logger.warning("Sync add is not implemented for LocalIndex.") - self.add(embeddings, routes, utterances) + logger.error("Sync remove is not implemented for LocalIndex.") def get_routes(self) -> List[Tuple]: """ diff --git a/semantic_router/index/pinecone.py b/semantic_router/index/pinecone.py index 858d56c9..e88fa148 100644 --- a/semantic_router/index/pinecone.py +++ b/semantic_router/index/pinecone.py @@ -202,17 +202,24 @@ async def _init_async_index(self, force_create: bool = False): logger.warning("Index could not be initialized.") self.host = index_stats["host"] if index_stats else None - def _sync_index(self, local_routes: dict): + def _sync_index(self, local_route_names: List[str], local_utterances: List[str], dimensions: int): + if self.index is None: + self.dimensions = self.dimensions or dimensions + self.index = self._init_index(force_create=True) + remote_routes = self.get_routes() remote_dict: dict = {route: set() for route, _ in remote_routes} for route, utterance in remote_routes: remote_dict[route].add(utterance) - local_dict: dict = {route: set() for route in local_routes["routes"]} - for route, utterance in zip(local_routes["routes"], local_routes["utterances"]): + local_dict: dict = {route: set() for route in local_route_names} + for route, utterance in zip(local_route_names, local_utterances): local_dict[route].add(utterance) + logger.info(f"Local routes: {local_dict}") + logger.info(f"Remote routes: {remote_dict}") + all_routes = set(remote_dict.keys()).union(local_dict.keys()) routes_to_add = [] @@ -284,17 +291,9 @@ def _sync_index(self, local_routes: dict): raise ValueError("Invalid sync mode specified") for utterance in utterances_to_include: - indices = [ - i - for i, x in enumerate(local_routes["utterances"]) - if x == utterance and local_routes["routes"][i] == route - ] - routes_to_add.extend( - [ - (local_routes["embeddings"][idx], route, utterance) - for idx in indices - ] - ) + routes_to_add.append((route, utterance)) + + logger.info(f"Layer routes: {layer_routes}") return routes_to_add, routes_to_delete, layer_routes @@ -325,65 +324,18 @@ def add( for i in range(0, len(vectors_to_upsert), batch_size): batch = vectors_to_upsert[i : i + batch_size] self._batch_upsert(batch) - - def _add_and_sync( - self, - embeddings: List[List[float]], - routes: List[str], - utterances: List[str], - batch_size: int = 100, - ) -> List[Route]: - """Add vectors to Pinecone in batches and return the overall updated list of Route objects.""" - if self.index is None: - self.dimensions = self.dimensions or len(embeddings[0]) - self.index = self._init_index(force_create=True) - - local_routes = { - "routes": routes, - "utterances": utterances, - "embeddings": embeddings, - } - if self.sync is not None: - data_to_upsert, data_to_delete, layer_routes_dict = self._sync_index( - local_routes=local_routes - ) - - layer_routes = [ - Route(name=route, utterances=layer_routes_dict[route]) - for route in layer_routes_dict.keys() + + def _remove_and_sync(self, routes_to_delete: dict): + for route, utterances in routes_to_delete.items(): + remote_routes = self._get_routes_with_ids(route_name=route) + ids_to_delete = [ + r["id"] + for r in remote_routes + if (r["route"], r["utterance"]) + in zip([route] * len(utterances), utterances) ] - - routes_to_delete: dict = {} - for route, utterance in data_to_delete: - routes_to_delete.setdefault(route, []).append(utterance) - - for route, utterances in routes_to_delete.items(): - remote_routes = self._get_routes_with_ids(route_name=route) - ids_to_delete = [ - r["id"] - for r in remote_routes - if (r["route"], r["utterance"]) - in zip([route] * len(utterances), utterances) - ] - if ids_to_delete and self.index: - self.index.delete(ids=ids_to_delete) - - else: - data_to_upsert = [ - (vector, route, utterance) - for vector, route, utterance in zip(embeddings, routes, utterances) - ] - - vectors_to_upsert = [ - PineconeRecord(values=vector, route=route, utterance=utterance).to_dict() - for vector, route, utterance in data_to_upsert - ] - - for i in range(0, len(vectors_to_upsert), batch_size): - batch = vectors_to_upsert[i : i + batch_size] - self._batch_upsert(batch) - - return layer_routes + if ids_to_delete and self.index: + self.index.delete(ids=ids_to_delete) def _get_route_ids(self, route_name: str): clean_route = clean_route_name(route_name) diff --git a/semantic_router/index/qdrant.py b/semantic_router/index/qdrant.py index 0fff2314..a77e6f88 100644 --- a/semantic_router/index/qdrant.py +++ b/semantic_router/index/qdrant.py @@ -160,16 +160,13 @@ def _init_collection(self) -> None: **self.config, ) - def _add_and_sync( - self, - embeddings: List[List[float]], - routes: List[str], - utterances: List[str], - batch_size: int = DEFAULT_UPLOAD_BATCH_SIZE, - ): + def _remove_and_sync(self, routes_to_delete: dict): + if self.sync is not None: + logger.error("Sync remove is not implemented for LocalIndex.") + + def _sync_index(self, local_route_names: List[str], local_utterances: List[str], dimensions: int): if self.sync is not None: - logger.warning("Sync add is not implemented for QdrantIndex") - self.add(embeddings, routes, utterances, batch_size) + logger.error("Sync remove is not implemented for QdrantIndex.") def add( self, diff --git a/semantic_router/layer.py b/semantic_router/layer.py index 61824033..bbe291ce 100644 --- a/semantic_router/layer.py +++ b/semantic_router/layer.py @@ -222,14 +222,7 @@ def __init__( if len(self.routes) > 0: self._add_and_sync_routes(routes=self.routes) else: - dummy_embedding = self.encoder(["dummy"]) - - layer_routes = self.index._add_and_sync( - embeddings=dummy_embedding, - routes=[], - utterances=[], - ) - self._set_layer_routes(layer_routes) + self._add_and_sync_routes(routes=[]) elif len(self.routes) > 0: self._add_routes(routes=self.routes) @@ -479,12 +472,9 @@ def _refresh_routes(self): def _add_routes(self, routes: List[Route]): # create embeddings for all routes - all_utterances = [ - utterance for route in routes for utterance in route.utterances - ] + route_names, all_utterances = self._extract_routes_details(routes) embedded_utterances = self.encoder(all_utterances) # create route array - route_names = [route.name for route in routes for _ in route.utterances] # add everything to the index self.index.add( embeddings=embedded_utterances, @@ -492,22 +482,47 @@ def _add_routes(self, routes: List[Route]): utterances=all_utterances, ) + def _add_and_sync_routes(self, routes: List[Route]): # create embeddings for all routes and sync at startup with remote ones based on sync setting - all_utterances = [ - utterance for route in routes for utterance in route.utterances + local_route_names, local_utterances = self._extract_routes_details(routes) + routes_to_add, routes_to_delete, layer_routes_dict = self.index._sync_index( + local_route_names=local_route_names, + local_utterances=local_utterances, + dimensions=len(self.encoder(["dummy"])[0]) + ) + + logger.info(f"ROUTES TO ADD: {(routes_to_add)}") + logger.info(f"ROUTES TO DELETE: {(routes_to_delete)}") + + layer_routes = [ + Route(name=route, utterances=layer_routes_dict[route]) + for route in layer_routes_dict.keys() ] - embedded_utterances = self.encoder(all_utterances) - # create route array - route_names = [route.name for route in routes for _ in route.utterances] - # add everything to the index - layer_routes = self.index._add_and_sync( - embeddings=embedded_utterances, - routes=route_names, - utterances=all_utterances, + + data_to_delete: dict = {} + for route, utterance in routes_to_delete: + data_to_delete.setdefault(route, []).append(utterance) + self.index._remove_and_sync(data_to_delete) + + all_utterances_to_add = [utt for _, utt in routes_to_add] + embedded_utterances_to_add = self.encoder(all_utterances_to_add) if all_utterances_to_add else [] + + route_names_to_add = [route for route, _, in routes_to_add] + + self.index.add( + embeddings=embedded_utterances_to_add, + routes=route_names_to_add, + utterances=all_utterances_to_add, ) + self._set_layer_routes(layer_routes) + def _extract_routes_details(self, routes: List[Route]) -> Tuple[List[str], List[str]]: + route_names = [route.name for route in routes for _ in route.utterances] + utterances = [utterance for route in routes for utterance in route.utterances] + return route_names, utterances + def _encode(self, text: str) -> Any: """Given some text, encode it.""" # create query vector From 3f37550ef291934408b1ce89c0adb8122867174b Mon Sep 17 00:00:00 2001 From: Vits Date: Tue, 30 Jul 2024 18:24:16 +0200 Subject: [PATCH 6/6] Revert "Implemented/Modified remove and sync methods for index classes" This reverts commit 7597bc355d8aedc38a8a41c6177f889cc9be8a7b. --- semantic_router/index/base.py | 13 +++-- semantic_router/index/local.py | 14 +++-- semantic_router/index/pinecone.py | 96 +++++++++++++++++++++++-------- semantic_router/index/qdrant.py | 15 +++-- semantic_router/layer.py | 59 +++++++------------ 5 files changed, 120 insertions(+), 77 deletions(-) diff --git a/semantic_router/index/base.py b/semantic_router/index/base.py index 9eb99532..76388d1d 100644 --- a/semantic_router/index/base.py +++ b/semantic_router/index/base.py @@ -32,10 +32,15 @@ def add( This method should be implemented by subclasses. """ raise NotImplementedError("This method should be implemented by subclasses.") - - def _remove_and_sync(self, routes_to_delete: dict): + + def _add_and_sync( + self, + embeddings: List[List[float]], + routes: List[str], + utterances: List[Any], + ): """ - Remove embeddings in a routes syncing process from the index. + Add embeddings to the index and manage index syncing if necessary. This method should be implemented by subclasses. """ raise NotImplementedError("This method should be implemented by subclasses.") @@ -86,7 +91,7 @@ def delete_index(self): """ raise NotImplementedError("This method should be implemented by subclasses.") - def _sync_index(self, local_route_names: List[str], local_utterances: List[str], dimensions: int): + def _sync_index(self, local_routes: dict): """ Synchronize the local index with the remote index based on the specified mode. Modes: diff --git a/semantic_router/index/local.py b/semantic_router/index/local.py index dbc41f1a..5426ec76 100644 --- a/semantic_router/index/local.py +++ b/semantic_router/index/local.py @@ -42,13 +42,15 @@ def add( self.routes = np.concatenate([self.routes, routes_arr]) self.utterances = np.concatenate([self.utterances, utterances_arr]) - def _remove_and_sync(self, routes_to_delete: dict): - if self.sync is not None: - logger.warning("Sync remove is not implemented for LocalIndex.") - - def _sync_index(self, local_route_names: List[str], local_utterances: List[str], dimensions: int): + def _add_and_sync( + self, + embeddings: List[List[float]], + routes: List[str], + utterances: List[str], + ): if self.sync is not None: - logger.error("Sync remove is not implemented for LocalIndex.") + logger.warning("Sync add is not implemented for LocalIndex.") + self.add(embeddings, routes, utterances) def get_routes(self) -> List[Tuple]: """ diff --git a/semantic_router/index/pinecone.py b/semantic_router/index/pinecone.py index e88fa148..858d56c9 100644 --- a/semantic_router/index/pinecone.py +++ b/semantic_router/index/pinecone.py @@ -202,24 +202,17 @@ async def _init_async_index(self, force_create: bool = False): logger.warning("Index could not be initialized.") self.host = index_stats["host"] if index_stats else None - def _sync_index(self, local_route_names: List[str], local_utterances: List[str], dimensions: int): - if self.index is None: - self.dimensions = self.dimensions or dimensions - self.index = self._init_index(force_create=True) - + def _sync_index(self, local_routes: dict): remote_routes = self.get_routes() remote_dict: dict = {route: set() for route, _ in remote_routes} for route, utterance in remote_routes: remote_dict[route].add(utterance) - local_dict: dict = {route: set() for route in local_route_names} - for route, utterance in zip(local_route_names, local_utterances): + local_dict: dict = {route: set() for route in local_routes["routes"]} + for route, utterance in zip(local_routes["routes"], local_routes["utterances"]): local_dict[route].add(utterance) - logger.info(f"Local routes: {local_dict}") - logger.info(f"Remote routes: {remote_dict}") - all_routes = set(remote_dict.keys()).union(local_dict.keys()) routes_to_add = [] @@ -291,9 +284,17 @@ def _sync_index(self, local_route_names: List[str], local_utterances: List[str], raise ValueError("Invalid sync mode specified") for utterance in utterances_to_include: - routes_to_add.append((route, utterance)) - - logger.info(f"Layer routes: {layer_routes}") + indices = [ + i + for i, x in enumerate(local_routes["utterances"]) + if x == utterance and local_routes["routes"][i] == route + ] + routes_to_add.extend( + [ + (local_routes["embeddings"][idx], route, utterance) + for idx in indices + ] + ) return routes_to_add, routes_to_delete, layer_routes @@ -324,18 +325,65 @@ def add( for i in range(0, len(vectors_to_upsert), batch_size): batch = vectors_to_upsert[i : i + batch_size] self._batch_upsert(batch) - - def _remove_and_sync(self, routes_to_delete: dict): - for route, utterances in routes_to_delete.items(): - remote_routes = self._get_routes_with_ids(route_name=route) - ids_to_delete = [ - r["id"] - for r in remote_routes - if (r["route"], r["utterance"]) - in zip([route] * len(utterances), utterances) + + def _add_and_sync( + self, + embeddings: List[List[float]], + routes: List[str], + utterances: List[str], + batch_size: int = 100, + ) -> List[Route]: + """Add vectors to Pinecone in batches and return the overall updated list of Route objects.""" + if self.index is None: + self.dimensions = self.dimensions or len(embeddings[0]) + self.index = self._init_index(force_create=True) + + local_routes = { + "routes": routes, + "utterances": utterances, + "embeddings": embeddings, + } + if self.sync is not None: + data_to_upsert, data_to_delete, layer_routes_dict = self._sync_index( + local_routes=local_routes + ) + + layer_routes = [ + Route(name=route, utterances=layer_routes_dict[route]) + for route in layer_routes_dict.keys() ] - if ids_to_delete and self.index: - self.index.delete(ids=ids_to_delete) + + routes_to_delete: dict = {} + for route, utterance in data_to_delete: + routes_to_delete.setdefault(route, []).append(utterance) + + for route, utterances in routes_to_delete.items(): + remote_routes = self._get_routes_with_ids(route_name=route) + ids_to_delete = [ + r["id"] + for r in remote_routes + if (r["route"], r["utterance"]) + in zip([route] * len(utterances), utterances) + ] + if ids_to_delete and self.index: + self.index.delete(ids=ids_to_delete) + + else: + data_to_upsert = [ + (vector, route, utterance) + for vector, route, utterance in zip(embeddings, routes, utterances) + ] + + vectors_to_upsert = [ + PineconeRecord(values=vector, route=route, utterance=utterance).to_dict() + for vector, route, utterance in data_to_upsert + ] + + for i in range(0, len(vectors_to_upsert), batch_size): + batch = vectors_to_upsert[i : i + batch_size] + self._batch_upsert(batch) + + return layer_routes def _get_route_ids(self, route_name: str): clean_route = clean_route_name(route_name) diff --git a/semantic_router/index/qdrant.py b/semantic_router/index/qdrant.py index a77e6f88..0fff2314 100644 --- a/semantic_router/index/qdrant.py +++ b/semantic_router/index/qdrant.py @@ -160,13 +160,16 @@ def _init_collection(self) -> None: **self.config, ) - def _remove_and_sync(self, routes_to_delete: dict): - if self.sync is not None: - logger.error("Sync remove is not implemented for LocalIndex.") - - def _sync_index(self, local_route_names: List[str], local_utterances: List[str], dimensions: int): + def _add_and_sync( + self, + embeddings: List[List[float]], + routes: List[str], + utterances: List[str], + batch_size: int = DEFAULT_UPLOAD_BATCH_SIZE, + ): if self.sync is not None: - logger.error("Sync remove is not implemented for QdrantIndex.") + logger.warning("Sync add is not implemented for QdrantIndex") + self.add(embeddings, routes, utterances, batch_size) def add( self, diff --git a/semantic_router/layer.py b/semantic_router/layer.py index bbe291ce..61824033 100644 --- a/semantic_router/layer.py +++ b/semantic_router/layer.py @@ -222,7 +222,14 @@ def __init__( if len(self.routes) > 0: self._add_and_sync_routes(routes=self.routes) else: - self._add_and_sync_routes(routes=[]) + dummy_embedding = self.encoder(["dummy"]) + + layer_routes = self.index._add_and_sync( + embeddings=dummy_embedding, + routes=[], + utterances=[], + ) + self._set_layer_routes(layer_routes) elif len(self.routes) > 0: self._add_routes(routes=self.routes) @@ -472,9 +479,12 @@ def _refresh_routes(self): def _add_routes(self, routes: List[Route]): # create embeddings for all routes - route_names, all_utterances = self._extract_routes_details(routes) + all_utterances = [ + utterance for route in routes for utterance in route.utterances + ] embedded_utterances = self.encoder(all_utterances) # create route array + route_names = [route.name for route in routes for _ in route.utterances] # add everything to the index self.index.add( embeddings=embedded_utterances, @@ -482,47 +492,22 @@ def _add_routes(self, routes: List[Route]): utterances=all_utterances, ) - def _add_and_sync_routes(self, routes: List[Route]): # create embeddings for all routes and sync at startup with remote ones based on sync setting - local_route_names, local_utterances = self._extract_routes_details(routes) - routes_to_add, routes_to_delete, layer_routes_dict = self.index._sync_index( - local_route_names=local_route_names, - local_utterances=local_utterances, - dimensions=len(self.encoder(["dummy"])[0]) - ) - - logger.info(f"ROUTES TO ADD: {(routes_to_add)}") - logger.info(f"ROUTES TO DELETE: {(routes_to_delete)}") - - layer_routes = [ - Route(name=route, utterances=layer_routes_dict[route]) - for route in layer_routes_dict.keys() + all_utterances = [ + utterance for route in routes for utterance in route.utterances ] - - data_to_delete: dict = {} - for route, utterance in routes_to_delete: - data_to_delete.setdefault(route, []).append(utterance) - self.index._remove_and_sync(data_to_delete) - - all_utterances_to_add = [utt for _, utt in routes_to_add] - embedded_utterances_to_add = self.encoder(all_utterances_to_add) if all_utterances_to_add else [] - - route_names_to_add = [route for route, _, in routes_to_add] - - self.index.add( - embeddings=embedded_utterances_to_add, - routes=route_names_to_add, - utterances=all_utterances_to_add, + embedded_utterances = self.encoder(all_utterances) + # create route array + route_names = [route.name for route in routes for _ in route.utterances] + # add everything to the index + layer_routes = self.index._add_and_sync( + embeddings=embedded_utterances, + routes=route_names, + utterances=all_utterances, ) - self._set_layer_routes(layer_routes) - def _extract_routes_details(self, routes: List[Route]) -> Tuple[List[str], List[str]]: - route_names = [route.name for route in routes for _ in route.utterances] - utterances = [utterance for route in routes for utterance in route.utterances] - return route_names, utterances - def _encode(self, text: str) -> Any: """Given some text, encode it.""" # create query vector