diff --git a/src/resin/knoweldge_base/base.py b/src/resin/knoweldge_base/base.py index 92ae64b0..32ca4152 100644 --- a/src/resin/knoweldge_base/base.py +++ b/src/resin/knoweldge_base/base.py @@ -12,7 +12,9 @@ class BaseKnowledgeBase(ABC, ConfigurableMixin): """ @abstractmethod - def query(self, queries: List[Query], global_metadata_filter: Optional[dict] = None + def query(self, + queries: List[Query], + global_metadata_filter: Optional[dict] = None ) -> List[QueryResult]: pass @@ -44,7 +46,6 @@ async def aquery(self, async def aupsert(self, documents: List[Document], namespace: str = "", - ) -> None: pass diff --git a/src/resin/knoweldge_base/knowledge_base.py b/src/resin/knoweldge_base/knowledge_base.py index 65e04229..c0871341 100644 --- a/src/resin/knoweldge_base/knowledge_base.py +++ b/src/resin/knoweldge_base/knowledge_base.py @@ -38,6 +38,32 @@ class KnowledgeBase(BaseKnowledgeBase): + """ + The `KnowledgeBase` is used to store and retrieve text documents, using an underlying Pinecone index. + Every document is chunked into multiple text snippets based on the text structure (e.g. Markdown or HTML formatting) + Then, each chunk is encoded into a vector using an embedding model, and the resulting vectors are inserted to the Pinecone index. + After documents were inserted, the KnowledgeBase can be queried by sending a textual query, which will first encoded to a vector + and then used to retrieve the closest top-k document chunks. + + Note: Since Resin defines its own data format, you can not use a pre-existing Pinecone index with Resin's KnowledgeBase. + The index must be created by using `knowledge_base.create_resin_index()` or the CLI command `resin new`. + + When creating a new Resin service, the user must first create the underlying Pinecone index. + This is a one-time setup process - the index will exist on Pinecone's managed service until it is deleted. + + Example: + >>> from resin.knoweldge_base.knowledge_base import KnowledgeBase + >>> from tokenizer import Tokenizer + >>> Tokenizer.initialize() + >>> kb = KnowledgeBase(index_name="my_index") + >>> kb.create_resin_index() + + In any future interactions, the user simply needs to connect to the existing index: + + >>> kb = KnowledgeBase(index_name="my_index") + >>> kb.connect() + """ # noqa: E501 + _DEFAULT_COMPONENTS = { 'record_encoder': OpenAIRecordEncoder, 'chunker': MarkdownChunker, @@ -52,6 +78,42 @@ def __init__(self, reranker: Optional[Reranker] = None, default_top_k: int = 5, ): + """ + Initilize the knowledge base object. + + If the index does not exist, the user must first create it by calling `create_resin_index()` or the CLI command `resin new`. + + Note: Resin will add the prefix --resin to your selected index name. + You can retrieve the full index name knowledge_base.index_name at any time, or find it in the Pinecone console at https://app.pinecone.io/ + + Example: + + create a new index: + >>> from resin.knoweldge_base.knowledge_base import KnowledgeBase + >>> from tokenizer import Tokenizer + >>> Tokenizer.initialize() + >>> kb = KnowledgeBase(index_name="my_index") + >>> kb.create_resin_index() + + In any future interactions, + the user simply needs to connect to the existing index: + + >>> kb = KnowledgeBase(index_name="my_index") + >>> kb.connect() + + Args: + index_name: The name of the underlying Pinecone index. + record_encoder: An instance of RecordEncoder to use for encoding documents and queries. + Defaults to OpenAIRecordEncoder. + chunker: An instance of Chunker to use for chunking documents. Defaults to MarkdownChunker. + reranker: An instance of Reranker to use for reranking query results. Defaults to TransparentReranker. + default_top_k: The default number of document chunks to return per query. Defaults to 5. + index_params: A dictionary of parameters to pass to the index creation API. Defaults to None. + see https://docs.pinecone.io/docs/python-client#create_index + + Returns: + KnowledgeBase object. + """ # noqa: E501 if default_top_k < 1: raise ValueError("default_top_k must be greater than 0") @@ -134,11 +196,33 @@ def _connection_error_msg(self) -> str: ) def connect(self) -> None: + """ + Connect to the underlying Pinecone index. + This method must be called before making any other calls to the knowledge base. + + Note: If underlying index is not provisioned yet, an exception will be raised. + To provision the index, use `create_resin_index()` or the CLI command `resin new`. + + Returns: + None + + Raises: + RuntimeError: If the knowledge base failed to connect to the underlying Pinecone index. + """ # noqa: E501 if self._index is None: self._index = self._connect_index() self.verify_index_connection() def verify_index_connection(self) -> None: + """ + Verify that the knowledge base is connected to the underlying Pinecone index. + + Returns: + None + + Raises: + RuntimeError: If the knowledge base is not connected properly to the underlying Pinecone index. + """ # noqa: E501 if self._index is None: raise RuntimeError(self._connection_error_msg) @@ -154,6 +238,39 @@ def create_resin_index(self, dimension: Optional[int] = None, index_params: Optional[dict] = None ): + """ + Creates the underlying Pinecone index that will be used by the KnowledgeBase. + This is a one time set-up operation that only needs to be done once for every new Resin service. + After the index was created, it will persist in Pinecone until explicitly deleted. + + Since Resin defines its own data format, namely a few dedicated metadata fields, + you can not use a pre-existing Pinecone index with Resin's KnowledgeBase. + The index must be created by using `knowledge_base.create_resin_index()` or the CLI command `resin new`. + + Note: This operation may take a few minutes to complete. + Once created, you can see the index in the Pinecone console + + Note: Resin will add the prefix --resin to your selected index name. + You can retrieve the full index name knowledge_base.index_name at any time. + Or find it in the Pinecone console at https://app.pinecone.io/ + + Args: + indexed_fields: A list of metadata fields that would be indexed, + allowing them to be later used for metadata filtering. + All other metadata fields are stored but not indexed. + See: https://docs.pinecone.io/docs/manage-indexes#selective-metadata-indexing. + Resin always indexes a built-in `document_id` field, which is added to every vector. + By default - all other metadata fields are **not** indexed, unless explicitly defined in this list. + dimension: The dimension of the vectors to index. + If `dimension` isn't explicitly provided, + Resin would try to infer the embedding's dimension based on the configured `Encoder` + index_params: A dictionary of parameters to pass to the index creation API. + For example, you can set the index's number of replicas by passing {"replicas": 2}. + see https://docs.pinecone.io/docs/python-client#create_index + + Returns: + None + """ # noqa: E501 # validate inputs if indexed_fields is None: indexed_fields = ['document_id'] @@ -226,9 +343,23 @@ def _get_full_index_name(index_name: str) -> str: @property def index_name(self) -> str: + """ + The name of the index the knowledge base is connected to. + """ return self._index_name def delete_index(self): + """ + Delete the underlying Pinecone index. + + **Note: THIS OPERATION IS NOT REVERSIBLE!!** + Once deleted, the index together with any stored documents cannot be restored! + + After deletion - the `KnowledgeBase` would not be connected to a Pinecone index anymore, + so you will not be able to insert documents or query. + If you'd wish to re-create an index with the same name, simply call `knowledge_base.create_resin_index()` + Or use the CLI command `resin new`. + """ # noqa: E501 if self._index is None: raise RuntimeError(self._connection_error_msg) delete_index(self._index_name) @@ -238,6 +369,35 @@ def query(self, queries: List[Query], global_metadata_filter: Optional[dict] = None ) -> List[QueryResult]: + """ + Query the knowledge base to retrieve document chunks. + + This operation includes several steps: + 1. Encode the queries to vectors using the underlying encoder. + 2. Query the underlying Pinecone index to retrieve the top-k chunks for each query. + 3. Rerank the results using the underlying reranker. + 4. Return the results for each query as a list of QueryResult objects. + + Args: + queries: A list of queries to run against the knowledge base. + global_metadata_filter: A metadata filter to apply to all queries, in addition to any query-specific filters. + For example, the filter {"website": "wiki"} will only return documents with the metadata {"website": "wiki"} (in case provided in upsert) + see https://docs.pinecone.io/docs/metadata-filtering + Returns: + A list of QueryResult objects. + + Examples: + >>> from resin.knoweldge_base.knowledge_base import KnowledgeBase + >>> from tokenizer import Tokenizer + >>> Tokenizer.initialize() + >>> kb = KnowledgeBase(index_name="my_index") + >>> kb.connect() + >>> queries = [Query(text="How to make a cake"), + Query(text="How to make a pizza", + top_k=10, + metadata_filter={"website": "wiki"})] + >>> results = kb.query(queries) + """ # noqa: E501 if self._index is None: raise RuntimeError(self._connection_error_msg) @@ -298,6 +458,43 @@ def upsert(self, documents: List[Document], namespace: str = "", batch_size: int = 100): + """ + Upsert documents into the knowledge base. + Upsert operation stands for "update or insert". + It means that if a document with the same id already exists in the index, it will be updated with the new document. + Otherwise, a new document will be inserted. + + This operation includes several steps: + 1. Split the documents into smaller chunks. + 2. Encode the chunks to vectors. + 3. Delete any existing chunks belonging to the same documents. + 4. Upsert the chunks to the index. + + Args: + documents: A list of documents to upsert. + namespace: The namespace in the underlying index to upsert documents into. + batch_size: Refers only to the actual upsert operation to the underlying index. + The number of chunks (multiple piecies of text per document) to upsert in each batch. + + Returns: + None + + Example: + >>> from resin.knoweldge_base.knowledge_base import KnowledgeBase + >>> from tokenizer import Tokenizer + >>> Tokenizer.initialize() + >>> kb = KnowledgeBase(index_name="my_index") + >>> kb.connect() + >>> documents = [Document(id="doc1", + text="This is a document", + source="my_source", + metadata={"website": "wiki"}), + Document(id="doc2", + text="This is another document", + source="my_source", + metadata={"website": "wiki"})] + >>> kb.upsert(documents) + """ # noqa: E501 if self._index is None: raise RuntimeError(self._connection_error_msg) @@ -342,14 +539,37 @@ def upsert(self, # Upsert to Pinecone index dataset.to_pinecone_index(self._index_name, namespace=namespace, - should_create_index=False) + should_create_index=False, + batch_size=batch_size) def delete(self, document_ids: List[str], namespace: str = "") -> None: + """ + Delete documents from the underlying Pinecone index. + Since each document is chunked into multiple chunks, this operation will delete all chunks belonging to the given document ids. + This operation not raise an exception if the document does not exist. + + Args: + document_ids: A list of document ids to delete from the index. + namespace: The namespace in the underlying index to delete documents from. + + Returns: + None + + Example: + >>> from resin.knoweldge_base.knowledge_base import KnowledgeBase + >>> from tokenizer import Tokenizer + >>> Tokenizer.initialize() + >>> kb = KnowledgeBase(index_name="my_index") + >>> kb.connect() + >>> kb.delete(document_ids=["doc1", "doc2"]) + """ # noqa: E501 if self._index is None: raise RuntimeError(self._connection_error_msg) + # Currently starter env does not support delete by metadata filter + # So temporarily we delete the first DELETE_STARTER_CHUNKS_PER_DOC chunks if self._is_starter_env(): for i in range(0, len(document_ids), DELETE_STARTER_BATCH_SIZE): doc_ids_chunk = document_ids[i:i + DELETE_STARTER_BATCH_SIZE] @@ -371,10 +591,20 @@ def delete(self, ) @classmethod - def from_config(cls, config: Dict[str, Any], index_name: Optional[str] = None): - # The same logical config can be used by multiple Resin service instances, - # differing only by the index name. For that reason, `index_name` is recommended - # to be passed explicitly or as environment variable, not in the config file + def from_config(cls, + config: Dict[str, Any], + index_name: Optional[str] = None) -> 'KnowledgeBase': + """ + Create a KnowledgeBase object from a configuration dictionary. + + Args: + config: A dictionary containing the configuration for the knowledge base. + index_name: The name of the index to connect to (optional). + If not provided, the index name will be read from the environment variable INDEX_NAME. + + Returns: + A KnowledgeBase object. + """ # noqa: E501 index_name = index_name or os.getenv("INDEX_NAME") if index_name is None: raise ValueError(