Skip to content

Commit

Permalink
Revert "Implemented/Modified remove and sync methods for index classes"
Browse files Browse the repository at this point in the history
This reverts commit 7597bc3.
  • Loading branch information
Vits-99 committed Jul 30, 2024
1 parent 7597bc3 commit 3f37550
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 77 deletions.
13 changes: 9 additions & 4 deletions semantic_router/index/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,15 @@ def add(
This method should be implemented by subclasses.
"""
raise NotImplementedError("This method should be implemented by subclasses.")

def _remove_and_sync(self, routes_to_delete: dict):

def _add_and_sync(
self,
embeddings: List[List[float]],
routes: List[str],
utterances: List[Any],
):
"""
Remove embeddings in a routes syncing process from the index.
Add embeddings to the index and manage index syncing if necessary.
This method should be implemented by subclasses.
"""
raise NotImplementedError("This method should be implemented by subclasses.")
Expand Down Expand Up @@ -86,7 +91,7 @@ def delete_index(self):
"""
raise NotImplementedError("This method should be implemented by subclasses.")

def _sync_index(self, local_route_names: List[str], local_utterances: List[str], dimensions: int):
def _sync_index(self, local_routes: dict):
"""
Synchronize the local index with the remote index based on the specified mode.
Modes:
Expand Down
14 changes: 8 additions & 6 deletions semantic_router/index/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ def add(
self.routes = np.concatenate([self.routes, routes_arr])
self.utterances = np.concatenate([self.utterances, utterances_arr])

def _remove_and_sync(self, routes_to_delete: dict):
if self.sync is not None:
logger.warning("Sync remove is not implemented for LocalIndex.")

def _sync_index(self, local_route_names: List[str], local_utterances: List[str], dimensions: int):
def _add_and_sync(
self,
embeddings: List[List[float]],
routes: List[str],
utterances: List[str],
):
if self.sync is not None:
logger.error("Sync remove is not implemented for LocalIndex.")
logger.warning("Sync add is not implemented for LocalIndex.")
self.add(embeddings, routes, utterances)

def get_routes(self) -> List[Tuple]:
"""
Expand Down
96 changes: 72 additions & 24 deletions semantic_router/index/pinecone.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,24 +202,17 @@ async def _init_async_index(self, force_create: bool = False):
logger.warning("Index could not be initialized.")
self.host = index_stats["host"] if index_stats else None

def _sync_index(self, local_route_names: List[str], local_utterances: List[str], dimensions: int):
if self.index is None:
self.dimensions = self.dimensions or dimensions
self.index = self._init_index(force_create=True)

def _sync_index(self, local_routes: dict):
remote_routes = self.get_routes()

remote_dict: dict = {route: set() for route, _ in remote_routes}
for route, utterance in remote_routes:
remote_dict[route].add(utterance)

local_dict: dict = {route: set() for route in local_route_names}
for route, utterance in zip(local_route_names, local_utterances):
local_dict: dict = {route: set() for route in local_routes["routes"]}
for route, utterance in zip(local_routes["routes"], local_routes["utterances"]):
local_dict[route].add(utterance)

logger.info(f"Local routes: {local_dict}")
logger.info(f"Remote routes: {remote_dict}")

all_routes = set(remote_dict.keys()).union(local_dict.keys())

routes_to_add = []
Expand Down Expand Up @@ -291,9 +284,17 @@ def _sync_index(self, local_route_names: List[str], local_utterances: List[str],
raise ValueError("Invalid sync mode specified")

for utterance in utterances_to_include:
routes_to_add.append((route, utterance))

logger.info(f"Layer routes: {layer_routes}")
indices = [
i
for i, x in enumerate(local_routes["utterances"])
if x == utterance and local_routes["routes"][i] == route
]
routes_to_add.extend(
[
(local_routes["embeddings"][idx], route, utterance)
for idx in indices
]
)

return routes_to_add, routes_to_delete, layer_routes

Check warning on line 299 in semantic_router/index/pinecone.py

View check run for this annotation

Codecov / codecov/patch

semantic_router/index/pinecone.py#L299

Added line #L299 was not covered by tests

Expand Down Expand Up @@ -324,18 +325,65 @@ def add(
for i in range(0, len(vectors_to_upsert), batch_size):
batch = vectors_to_upsert[i : i + batch_size]
self._batch_upsert(batch)

def _remove_and_sync(self, routes_to_delete: dict):
for route, utterances in routes_to_delete.items():
remote_routes = self._get_routes_with_ids(route_name=route)
ids_to_delete = [
r["id"]
for r in remote_routes
if (r["route"], r["utterance"])
in zip([route] * len(utterances), utterances)

def _add_and_sync(
self,
embeddings: List[List[float]],
routes: List[str],
utterances: List[str],
batch_size: int = 100,
) -> List[Route]:
"""Add vectors to Pinecone in batches and return the overall updated list of Route objects."""
if self.index is None:
self.dimensions = self.dimensions or len(embeddings[0])
self.index = self._init_index(force_create=True)

local_routes = {
"routes": routes,
"utterances": utterances,
"embeddings": embeddings,
}
if self.sync is not None:
data_to_upsert, data_to_delete, layer_routes_dict = self._sync_index(

Check warning on line 347 in semantic_router/index/pinecone.py

View check run for this annotation

Codecov / codecov/patch

semantic_router/index/pinecone.py#L347

Added line #L347 was not covered by tests
local_routes=local_routes
)

layer_routes = [

Check warning on line 351 in semantic_router/index/pinecone.py

View check run for this annotation

Codecov / codecov/patch

semantic_router/index/pinecone.py#L351

Added line #L351 was not covered by tests
Route(name=route, utterances=layer_routes_dict[route])
for route in layer_routes_dict.keys()
]
if ids_to_delete and self.index:
self.index.delete(ids=ids_to_delete)

routes_to_delete: dict = {}
for route, utterance in data_to_delete:
routes_to_delete.setdefault(route, []).append(utterance)

for route, utterances in routes_to_delete.items():
remote_routes = self._get_routes_with_ids(route_name=route)
ids_to_delete = [
r["id"]
for r in remote_routes
if (r["route"], r["utterance"])
in zip([route] * len(utterances), utterances)
]
if ids_to_delete and self.index:
self.index.delete(ids=ids_to_delete)

else:
data_to_upsert = [
(vector, route, utterance)
for vector, route, utterance in zip(embeddings, routes, utterances)
]

vectors_to_upsert = [
PineconeRecord(values=vector, route=route, utterance=utterance).to_dict()
for vector, route, utterance in data_to_upsert
]

for i in range(0, len(vectors_to_upsert), batch_size):
batch = vectors_to_upsert[i : i + batch_size]
self._batch_upsert(batch)

return layer_routes

Check warning on line 386 in semantic_router/index/pinecone.py

View check run for this annotation

Codecov / codecov/patch

semantic_router/index/pinecone.py#L386

Added line #L386 was not covered by tests

def _get_route_ids(self, route_name: str):
clean_route = clean_route_name(route_name)
Expand Down
15 changes: 9 additions & 6 deletions semantic_router/index/qdrant.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,16 @@ def _init_collection(self) -> None:
**self.config,
)

def _remove_and_sync(self, routes_to_delete: dict):
if self.sync is not None:
logger.error("Sync remove is not implemented for LocalIndex.")

def _sync_index(self, local_route_names: List[str], local_utterances: List[str], dimensions: int):
def _add_and_sync(
self,
embeddings: List[List[float]],
routes: List[str],
utterances: List[str],
batch_size: int = DEFAULT_UPLOAD_BATCH_SIZE,
):
if self.sync is not None:
logger.error("Sync remove is not implemented for QdrantIndex.")
logger.warning("Sync add is not implemented for QdrantIndex")
self.add(embeddings, routes, utterances, batch_size)

def add(
self,
Expand Down
59 changes: 22 additions & 37 deletions semantic_router/layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,14 @@ def __init__(
if len(self.routes) > 0:
self._add_and_sync_routes(routes=self.routes)

Check warning on line 223 in semantic_router/layer.py

View check run for this annotation

Codecov / codecov/patch

semantic_router/layer.py#L222-L223

Added lines #L222 - L223 were not covered by tests
else:
self._add_and_sync_routes(routes=[])
dummy_embedding = self.encoder(["dummy"])

Check warning on line 225 in semantic_router/layer.py

View check run for this annotation

Codecov / codecov/patch

semantic_router/layer.py#L225

Added line #L225 was not covered by tests

layer_routes = self.index._add_and_sync(

Check warning on line 227 in semantic_router/layer.py

View check run for this annotation

Codecov / codecov/patch

semantic_router/layer.py#L227

Added line #L227 was not covered by tests
embeddings=dummy_embedding,
routes=[],
utterances=[],
)
self._set_layer_routes(layer_routes)

Check warning on line 232 in semantic_router/layer.py

View check run for this annotation

Codecov / codecov/patch

semantic_router/layer.py#L232

Added line #L232 was not covered by tests
elif len(self.routes) > 0:
self._add_routes(routes=self.routes)

Expand Down Expand Up @@ -472,57 +479,35 @@ def _refresh_routes(self):

def _add_routes(self, routes: List[Route]):
# create embeddings for all routes
route_names, all_utterances = self._extract_routes_details(routes)
all_utterances = [
utterance for route in routes for utterance in route.utterances
]
embedded_utterances = self.encoder(all_utterances)
# create route array
route_names = [route.name for route in routes for _ in route.utterances]
# add everything to the index
self.index.add(
embeddings=embedded_utterances,
routes=route_names,
utterances=all_utterances,
)


def _add_and_sync_routes(self, routes: List[Route]):
# create embeddings for all routes and sync at startup with remote ones based on sync setting
local_route_names, local_utterances = self._extract_routes_details(routes)
routes_to_add, routes_to_delete, layer_routes_dict = self.index._sync_index(
local_route_names=local_route_names,
local_utterances=local_utterances,
dimensions=len(self.encoder(["dummy"])[0])
)

logger.info(f"ROUTES TO ADD: {(routes_to_add)}")
logger.info(f"ROUTES TO DELETE: {(routes_to_delete)}")

layer_routes = [
Route(name=route, utterances=layer_routes_dict[route])
for route in layer_routes_dict.keys()
all_utterances = [
utterance for route in routes for utterance in route.utterances
]

data_to_delete: dict = {}
for route, utterance in routes_to_delete:
data_to_delete.setdefault(route, []).append(utterance)
self.index._remove_and_sync(data_to_delete)

all_utterances_to_add = [utt for _, utt in routes_to_add]
embedded_utterances_to_add = self.encoder(all_utterances_to_add) if all_utterances_to_add else []

route_names_to_add = [route for route, _, in routes_to_add]

self.index.add(
embeddings=embedded_utterances_to_add,
routes=route_names_to_add,
utterances=all_utterances_to_add,
embedded_utterances = self.encoder(all_utterances)
# create route array
route_names = [route.name for route in routes for _ in route.utterances]

Check warning on line 502 in semantic_router/layer.py

View check run for this annotation

Codecov / codecov/patch

semantic_router/layer.py#L502

Added line #L502 was not covered by tests
# add everything to the index
layer_routes = self.index._add_and_sync(
embeddings=embedded_utterances,
routes=route_names,
utterances=all_utterances,
)

self._set_layer_routes(layer_routes)

Check warning on line 509 in semantic_router/layer.py

View check run for this annotation

Codecov / codecov/patch

semantic_router/layer.py#L509

Added line #L509 was not covered by tests

def _extract_routes_details(self, routes: List[Route]) -> Tuple[List[str], List[str]]:
route_names = [route.name for route in routes for _ in route.utterances]
utterances = [utterance for route in routes for utterance in route.utterances]
return route_names, utterances

def _encode(self, text: str) -> Any:
"""Given some text, encode it."""
# create query vector
Expand Down

0 comments on commit 3f37550

Please sign in to comment.