Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inconsistent Embedding Reuse Behavior in VectorStoreIndex with DenseXRetrievalPack #14417

Open
1 task done
LikhithRishi opened this issue Jun 26, 2024 · 1 comment
Open
1 task done
Labels
question Further information is requested stale Issue has not had recent activity or appears to be solved. Stale issues will be automatically closed

Comments

@LikhithRishi
Copy link

LikhithRishi commented Jun 26, 2024

Question Validation

  • I have searched both the documentation and discord for an answer.

Question

class DenseXRetrievalPack(BaseLlamaPack):
def init(
self,
documents: List[Document],
proposition_llm: Optional[LLM] = None,
query_llm: Optional[LLM] = None,
embed_model: Optional[BaseEmbedding] = None,
text_splitter: TextSplitter = SentenceSplitter(),
vector_store: Optional[ElasticsearchStore] = None,
similarity_top_k: int = 4,
) -> None:
"""Init params."""
self._proposition_llm = llm

    embed_model = embed_model
   
    nodes = text_splitter.get_nodes_from_documents(documents)
    sub_nodes = self._gen_propositions(nodes)
    all_nodes = nodes + sub_nodes
    all_nodes_dict = {n.node_id: n for n in all_nodes}
    
    service_context = ServiceContext.from_defaults(
        llm=query_llm ,
        embed_model=embed_model,
        num_output=self._proposition_llm.metadata.num_output,
    )
  
    if os.path.exists('./chroma_db'):
        chroma_client = chromadb.PersistentClient(path="./chroma_db")
        chroma_collection = chroma_client.get_or_create_collection("quickstart")
        vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
        storage_context = StorageContext.from_defaults(vector_store=vector_store)
        self.vector_index = VectorStoreIndex.from_vector_store(vector_store,service_context=service_context)
    else:
       chroma_client = chromadb.PersistentClient(path="./chroma_db")
       chroma_collection = chroma_client.get_or_create_collection("quickstart")
       vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
       storage_context = StorageContext.from_defaults(vector_store=vector_store)
       self.vector_index = VectorStoreIndex(
            nodes, service_context=service_context, show_progress=True,storage_context=storage_context,store_nodes_override=True
            )
    self.retriever = RecursiveRetriever(
        "vector",
        retriever_dict={
            "vector": self.vector_index.as_retriever(
                similarity_top_k=similarity_top_k
            )
        },
        node_dict=all_nodes_dict,
    )

    self.query_engine = RetrieverQueryEngine.from_args(
        self.retriever, service_context=service_context
    )

async def _aget_proposition(self, node: TextNode) -> List[TextNode]:
    """Get proposition."""
    inital_output = await self._proposition_llm.apredict(
        PROPOSITIONS_PROMPT, node_text=node.text
    )
    outputs = inital_output.split("\n")

    all_propositions = []

    for output in outputs:
        if not output.strip():
            continue
        if not output.strip().endswith("]"):
            if not output.strip().endswith('"') and not output.strip().endswith(
                ","
            ):
                output = output + '"'
            output = output + " ]"
        if not output.strip().startswith("["):
            if not output.strip().startswith('"'):
                output = '"' + output
            output = "[ " + output

        try:
            propositions = json.loads(output)
        except Exception:
            # fallback to yaml
            try:
                propositions = yaml.safe_load(output)
            except Exception:
                # fallback to next output
                continue

        if not isinstance(propositions, list):
            continue

        all_propositions.extend(propositions)

    assert isinstance(all_propositions, list)
    nodes = [TextNode(text=prop) for prop in all_propositions if prop]

    return [IndexNode.from_text_node(n, node.node_id) for n in nodes]

def _gen_propositions(self, nodes: List[TextNode]) -> List[TextNode]:
    """Get propositions."""
    sub_nodes = asyncio.run(
        run_jobs(
            [self._aget_proposition(node) for node in nodes],
            show_progress=True,
            workers=8,
        )
    )
    print(sub_nodes)

    # Flatten list
    return [node for sub_node in sub_nodes for node in sub_node]

def get_modules(self) -> Dict[str, Any]:
    """Get modules."""
    return {
        "query_engine": self.query_engine,
        "retriever": self.retriever,
    }

def run(self, query_str: str, **kwargs: Any) -> RESPONSE_TYPE:
    """Run the pipeline."""
    return self.query_engine.query(query_str)

In the DenseXRetrievalPack class, I'm encountering an issue with VectorStoreIndex when using different sets of nodes (nodes versus all_nodes).
Relevant Code Lines:

if os.path.exists('./chroma_db'):
    chroma_client = chromadb.PersistentClient(path="./chroma_db")
    chroma_collection = chroma_client.get_or_create_collection("quickstart")
    vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
    storage_context = StorageContext.from_defaults(vector_store=vector_store)
    self.vector_index = VectorStoreIndex.from_vector_store(vector_store, service_context=service_context)
else:
    chroma_client = chromadb.PersistentClient(path="./chroma_db")
    chroma_collection = chroma_client.get_or_create_collection("quickstart")
    vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
    storage_context = StorageContext.from_defaults(vector_store=vector_store)
    self.vector_index = VectorStoreIndex(
        nodes, service_context=service_context, show_progress=True, storage_context=storage_context, store_nodes_override=True
    )

Issue:
The issue occurs with both ChromaDB and Elasticsearch.
When using nodes alone, the embeddings can be reused without any issue.
The problem arises specifically when all_nodes is used, which includes both nodes and sub_nodes, result in the error:
ValueError: Query id d28e3de4-1c4f-420a-9184-97bf8556b11b not found in either retriever_dict or query_engine_dict.

Expected Behavior:
I expect VectorStoreIndex to handle querying using embeddings from all_nodes (nodes+sub_nodes)without errors, similar to when using nodes.

Questions:

Why does the error occur when using all_nodes but not with nodes alone?
How can I modify the code to successfully reuse embeddings with all_nodes?
Is there a specific way to handle all_nodes with ChromaDB and Elasticsearch to avoid this issue?

@LikhithRishi LikhithRishi added the question Further information is requested label Jun 26, 2024
Copy link

dosubot bot commented Jun 26, 2024

The discrepancy between nodes and all_nodes usage in the VectorStoreIndex of the DenseXRetrievalPack class could be due to the filtering and querying logic implemented in the monkey_query function. Specifically, the function filters nodes based on metadata and node ID restrictions before performing the query. If all_nodes is not being considered in this filtering process, it could lead to a discrepancy.

To configure VectorStoreIndex to successfully query using embeddings from all_nodes, ensure that the filtering logic in the monkey_query function includes all relevant nodes. Here is the relevant part of the code that handles the filtering and querying:

def monkey_query(
    self,
    query: VectorStoreQuery,
    **kwargs: Any,
):
    """Get nodes for response."""
    # Prevent metadata filtering on stores that were persisted without metadata.
    if (
        query.filters is not None
        and self._data.embedding_dict
        and not self._data.metadata_dict
    ):
        raise ValueError(
            "Cannot filter stores that were persisted without metadata. "
            "Please rebuild the store with metadata to enable filtering."
        )
    # Prefilter nodes based on the query filter and node ID restrictions.
    query_filter_fn = _build_metadata_filter_fn(
        lambda node_id: self._data.metadata_dict[node_id], query.filters
    )

    if query.node_ids is not None:
        available_ids = set(query.node_ids)

        def node_filter_fn(node_id: str) -> bool:
            return node_id in available_ids

    else:

        def node_filter_fn(node_id: str) -> bool:
            return True

    node_ids = []
    embeddings = []
    # TODO: consolidate with get_query_text_embedding_similarities
    for node_id, embedding in self._data.embedding_dict.items():
        if node_filter_fn(node_id) and query_filter_fn(node_id):
            node_ids.append(node_id)
            embeddings.append(embedding)

    query_embedding = cast(List[float], query.query_embedding)
    if query.mode in LEARNER_MODES:
        top_similarities, top_ids = get_top_k_embeddings_learner(
            query_embedding,
            embeddings,
            similarity_top_k=query.similarity_top_k,
            embedding_ids=node_ids,
        )
    elif query.mode == MMR_MODE:
        mmr_threshold = kwargs.get("mmr_threshold", None)
        top_similarities, top_ids = get_top_k_mmr_embeddings(
            query_embedding,
            embeddings,
            similarity_top_k=query.similarity_top_k,
            embedding_ids=node_ids,
            mmr_threshold=mmr_threshold,
        )
    elif query.mode == VectorStoreQueryMode.DEFAULT:
        top_similarities, top_ids = get_top_k_embeddings(
            query_embedding,
            embeddings,
            similarity_top_k=query.similarity_top_k,
            embedding_ids=node_ids,
        )
    elif query.mode == VectorStoreQueryMode.HYBRID:  # where I made my changes
        top_similarities, top_ids = get_top_k_embeddings(
            query_embedding,
            embeddings,
            similarity_top_k=query.similarity_top_k,
            embedding_ids=node_ids,
        )
    else:
        raise ValueError(f"Invalid query mode: {query.mode}")

    return VectorStoreQueryResult(similarities=top_similarities, ids=top_ids)

Ensure that the node_filter_fn and query_filter_fn functions are correctly configured to include all relevant nodes from all_nodes. If all_nodes is a broader set than the current filtering allows, you may need to adjust the filtering logic to ensure it includes all necessary nodes.

Additionally, the VectorIndexRetriever class in the llama-index-legacy/llama_index/legacy/indices/vector_store/retrievers/retriever.py module provides a method to build and execute queries against the VectorStoreIndex. Here is an example of how to configure and use this class:

class VectorIndexRetriever(BaseRetriever):
    def __init__(
        self,
        index: VectorStoreIndex,
        similarity_top_k: int = DEFAULT_SIMILARITY_TOP_K,
        vector_store_query_mode: VectorStoreQueryMode = VectorStoreQueryMode.DEFAULT,
        filters: Optional[MetadataFilters] = None,
        alpha: Optional[float] = None,
        node_ids: Optional[List[str]] = None,
        doc_ids: Optional[List[str]] = None,
        sparse_top_k: Optional[int] = None,
        callback_manager: Optional[CallbackManager] = None,
        object_map: Optional[dict] = None,
        verbose: bool = False,
        **kwargs: Any,
    ) -> None:
        self._index = index
        self._vector_store = self._index.vector_store
        self._service_context = self._index.service_context
        self._docstore = self._index.docstore
        self._similarity_top_k = similarity_top_k
        self._vector_store_query_mode = VectorStoreQueryMode(vector_store_query_mode)
        self._alpha = alpha
        self._node_ids = node_ids
        self._doc_ids = doc_ids
        self._filters = filters
        self._sparse_top_k = sparse_top_k
        self._kwargs: Dict[str, Any] = kwargs.get("vector_store_kwargs", {})
        super().__init__(
            callback_manager=callback_manager, object_map=object_map, verbose=verbose
        )

    def _build_vector_store_query(
        self, query_bundle_with_embeddings: QueryBundle
    ) -> VectorStoreQuery:
        return VectorStoreQuery(
            query_embedding=query_bundle_with_embeddings.embedding,
            similarity_top_k=self._similarity_top_k,
            node_ids=self._node_ids,
            doc_ids=self._doc_ids,
            query_str=query_bundle_with_embeddings.query_str,
            mode=self._vector_store_query_mode,
            alpha=self._alpha,
            filters=self._filters,
            sparse_top_k=self._sparse_top_k,
        )

    def _build_node_list_from_query_result(
        self, query_result: VectorStoreQueryResult
    ) -> List[NodeWithScore]:
        if query_result.nodes is None:
            if query_result.ids is None:
                raise ValueError(
                    "Vector store query result should return at "
                    "least one of nodes or ids."
                )
            assert isinstance(self._index.index_struct, IndexDict)
            node_ids = [
                self._index.index_struct.nodes_dict[idx] for idx in query_result.ids
            ]
            nodes = self._docstore.get_nodes(node_ids)
            query_result.nodes = nodes
        else:
            for i in range(len(query_result.nodes)):
                source_node = query_result.nodes[i].source_node
                if (not self._vector_store.stores_text) or (
                    source_node is not None and source_node.node_type != ObjectType.TEXT
                ):
                    node_id = query_result.nodes[i].node_id
                    if self._docstore.document_exists(node_id):
                        query_result.nodes[i] = self._docstore.get_node(
                            node_id
                        )  # type: ignore[index]

        log_vector_store_query_result(query_result)

        node_with_scores: List[NodeWithScore] = []
        for ind, node in enumerate(query_result.nodes):
            score: Optional[float] = None
            if query_result.similarities is not None:
                score = query_result.similarities[ind]
            node_with_scores.append(NodeWithScore(node=node, score=score))

        return node_with_scores

    def _get_nodes_with_embeddings(
        self, query_bundle_with_embeddings: QueryBundle
    ) -> List[NodeWithScore]:
        query = self._build_vector_store_query(query_bundle_with_embeddings)
        query_result = self._vector_store.query(query, **self._kwargs)
        return self._build_node_list_from_query_result(query_result)

    async def _aget_nodes_with_embeddings(
        self, query_bundle_with_embeddings: QueryBundle
    ) -> List[NodeWithScore]:
        query = self._build_vector_store_query(query_bundle_with_embeddings)
        query_result = await self._vector_store.aquery(query, **self._kwargs)
        return self._build_node_list_from_query_result(query_result)

By using the VectorIndexRetriever class, you can ensure that the query includes all relevant nodes from all_nodes by properly configuring the node_ids and filters parameters. This will help in resolving the discrepancy and successfully querying using embeddings from all_nodes [1][2][3][4].

To continue talking to Dosu, mention @dosu.

@dosubot dosubot bot added the stale Issue has not had recent activity or appears to be solved. Stale issues will be automatically closed label Sep 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested stale Issue has not had recent activity or appears to be solved. Stale issues will be automatically closed
Projects
None yet
Development

No branches or pull requests

1 participant