diff --git a/comps/cores/proto/api_protocol.py b/comps/cores/proto/api_protocol.py index 6a32023ecb..97b621f6e6 100644 --- a/comps/cores/proto/api_protocol.py +++ b/comps/cores/proto/api_protocol.py @@ -269,6 +269,9 @@ class ChatCompletionRequest(BaseModel): # define request_type: Literal["chat"] = "chat" + # key index name + key_index_name: Optional[str] = None + class DocSumChatCompletionRequest(ChatCompletionRequest): summary_type: str = "auto" # can be "auto", "stuff", "truncate", "map_reduce", "refine" diff --git a/comps/cores/proto/docarray.py b/comps/cores/proto/docarray.py index 64b0b19fd2..2a0cd727bb 100644 --- a/comps/cores/proto/docarray.py +++ b/comps/cores/proto/docarray.py @@ -102,6 +102,7 @@ class EmbedDoc(BaseDoc): lambda_mult: float = 0.5 score_threshold: float = 0.2 constraints: Optional[Union[Dict[str, Any], List[Dict[str, Any]], None]] = None + index_name: Optional[str] = None class EmbedMultimodalDoc(EmbedDoc): @@ -225,6 +226,7 @@ class LLMParams(BaseDoc): repetition_penalty: float = 1.03 stream: bool = True language: str = "auto" # can be "en", "zh" + key_index_name: Optional[str] = None chat_template: Optional[str] = Field( default=None, diff --git a/comps/dataprep/src/integrations/redis.py b/comps/dataprep/src/integrations/redis.py index 586769b0e5..7c7a5b434e 100644 --- a/comps/dataprep/src/integrations/redis.py +++ b/comps/dataprep/src/integrations/redis.py @@ -102,11 +102,11 @@ def format_redis_conn_from_env(): redis_pool = redis.ConnectionPool.from_url(REDIS_URL) -async def check_index_existance(client): +def check_index_existance(client, index_name: str = KEY_INDEX_NAME): if logflag: logger.info(f"[ check index existence ] checking {client}") try: - results = await client.search("*") + results = client.search("*") if logflag: logger.info(f"[ check index existence ] index of client exists: {client}") return results @@ -116,12 +116,12 @@ async def check_index_existance(client): return None -async def create_index(client, index_name: str = KEY_INDEX_NAME): +def create_index(client, index_name: str = KEY_INDEX_NAME): if logflag: logger.info(f"[ create index ] creating index {index_name}") try: definition = IndexDefinition(index_type=IndexType.HASH, prefix=["file:"]) - await client.create_index((TextField("file_name"), TextField("key_ids")), definition=definition) + client.create_index((TextField("file_name"), TextField("key_ids")), definition=definition) if logflag: logger.info(f"[ create index ] index {index_name} successfully created") except Exception as e: @@ -131,16 +131,18 @@ async def create_index(client, index_name: str = KEY_INDEX_NAME): return True -async def store_by_id(client, key, value): +def store_by_id(client, key, value): if logflag: - logger.info(f"[ store by id ] storing ids of {key}") + logger.info(f"[ store by id ] storing ids of {client.index_name + '_' + key}") try: - await client.add_document(doc_id="file:" + key, file_name=key, key_ids=value) + client.add_document( + doc_id="file:" + client.index_name + "_" + key, file_name=client.index_name + "_" + key, key_ids=value + ) if logflag: - logger.info(f"[ store by id ] store document success. id: file:{key}") + logger.info(f"[ store by id ] store document success. id: file:{client.index_name + '_' + key}") except Exception as e: if logflag: - logger.info(f"[ store by id ] fail to store document file:{key}: {e}") + logger.info(f"[ store by id ] fail to store document file:{client.index_name + '_' + key}: {e}") return False return True @@ -185,9 +187,32 @@ def delete_by_id(client, id): return True -async def ingest_chunks_to_redis(file_name: str, chunks: List, embedder): +def ingest_chunks_to_redis(file_name: str, chunks: List): + KEY_INDEX_NAME = os.getenv("KEY_INDEX_NAME", "file-keys") if logflag: - logger.info(f"[ redis ingest chunks ] file name: {file_name}") + logger.info(f"[ redis ingest chunks ] file name: '{file_name}' to '{KEY_INDEX_NAME}' index.") + # Create vectorstore + if TEI_EMBEDDING_ENDPOINT: + if not HUGGINGFACEHUB_API_TOKEN: + raise HTTPException( + status_code=400, + detail="You MUST offer the `HUGGINGFACEHUB_API_TOKEN` when using `TEI_EMBEDDING_ENDPOINT`.", + ) + import requests + + response = requests.get(TEI_EMBEDDING_ENDPOINT + "/info") + if response.status_code != 200: + raise HTTPException( + status_code=400, detail=f"TEI embedding endpoint {TEI_EMBEDDING_ENDPOINT} is not available." + ) + model_id = response.json()["model_id"] + # create embeddings using TEI endpoint service + embedder = HuggingFaceInferenceAPIEmbeddings( + api_key=HUGGINGFACEHUB_API_TOKEN, model_name=model_id, api_url=TEI_EMBEDDING_ENDPOINT + ) + else: + # create embeddings using local embedding model + embedder = HuggingFaceBgeEmbeddings(model_name=EMBED_MODEL) # Batch size batch_size = 32 @@ -200,13 +225,13 @@ async def ingest_chunks_to_redis(file_name: str, chunks: List, embedder): batch_chunks = chunks[i : i + batch_size] batch_texts = batch_chunks - _, keys = await asyncio.to_thread( - Redis.from_texts_return_keys, + _, keys = Redis.from_texts_return_keys( texts=batch_texts, embedding=embedder, - index_name=INDEX_NAME, + index_name=KEY_INDEX_NAME, redis_url=REDIS_URL, ) + keys = [k.replace(KEY_INDEX_NAME, KEY_INDEX_NAME + "_" + file_name) for k in keys] if logflag: logger.info(f"[ redis ingest chunks ] keys: {keys}") file_ids.extend(keys) @@ -214,13 +239,15 @@ async def ingest_chunks_to_redis(file_name: str, chunks: List, embedder): logger.info(f"[ redis ingest chunks ] Processed batch {i//batch_size + 1}/{(num_chunks-1)//batch_size + 1}") # store file_ids into index file-keys - r = await aioredis.from_url(REDIS_URL) + r = redis.Redis(connection_pool=redis_pool) client = r.ft(KEY_INDEX_NAME) - if not await check_index_existance(client): - await create_index(client) + + if not check_index_existance(client): + assert create_index(client, index_name=KEY_INDEX_NAME) try: - await store_by_id(client, key=file_name, value="#".join(file_ids)) + assert store_by_id(client, key=file_name, value="#".join(file_ids)) + except Exception as e: if logflag: logger.info(f"[ redis ingest chunks ] {e}. Fail to store chunks of file {file_name}.") @@ -228,7 +255,7 @@ async def ingest_chunks_to_redis(file_name: str, chunks: List, embedder): return True -async def ingest_data_to_redis(doc_path: DocPath, embedder): +def ingest_data_to_redis(doc_path: DocPath): """Ingest document to Redis.""" path = doc_path.path if logflag: @@ -249,7 +276,7 @@ async def ingest_data_to_redis(doc_path: DocPath, embedder): separators=get_separators(), ) - content = await document_loader(path) + content = document_loader(path) if logflag: logger.info("[ redis ingest data ] file content loaded") @@ -259,7 +286,7 @@ async def ingest_data_to_redis(doc_path: DocPath, embedder): if ext in structured_types: chunks = content else: - chunks = await asyncio.to_thread(text_splitter.split_text, content) + chunks = text_splitter.split_text(content) ### Specially processing for the table content in PDFs if doc_path.process_table and path.endswith(".pdf"): @@ -269,7 +296,8 @@ async def ingest_data_to_redis(doc_path: DocPath, embedder): logger.info(f"[ redis ingest data ] Done preprocessing. Created {len(chunks)} chunks of the given file.") file_name = doc_path.path.split("/")[-1] - return await ingest_chunks_to_redis(file_name, chunks, embedder) + + return ingest_chunks_to_redis(file_name, chunks) @OpeaComponentRegistry.register("OPEA_DATAPREP_REDIS") @@ -282,54 +310,26 @@ class OpeaRedisDataprep(OpeaComponent): def __init__(self, name: str, description: str, config: dict = None): super().__init__(name, ServiceType.DATAPREP.name.lower(), description, config) - self.client = redis.Redis(connection_pool=redis_pool) - self.data_index_client, self.key_index_client = asyncio.run(self._initialize_client()) - self.embedder = asyncio.run(self._initialize_embedder()) + self.client = self._initialize_client() + self.data_index_client = self.client.ft(INDEX_NAME) + self.key_index_client = self.client.ft(KEY_INDEX_NAME) health_status = self.check_health() if not health_status: logger.error("OpeaRedisDataprep health check failed.") - async def _initialize_client(self) -> redis.Redis: + def _initialize_client(self) -> redis.Redis: if logflag: logger.info("[ initialize client ] initializing redis client...") """Initializes the redis client.""" try: - client = await aioredis.from_url(REDIS_URL) - data_index_client = client.ft(INDEX_NAME) - key_index_client = client.ft(KEY_INDEX_NAME) - return data_index_client, key_index_client + client = redis.Redis(connection_pool=redis_pool) + return client except Exception as e: logger.error(f"fail to initialize redis client: {e}") return None - async def _initialize_embedder(self): - if TEI_EMBEDDING_ENDPOINT: - if not HUGGINGFACEHUB_API_TOKEN: - raise HTTPException( - status_code=400, - detail="You MUST offer the `HUGGINGFACEHUB_API_TOKEN` when using `TEI_EMBEDDING_ENDPOINT`.", - ) - - import httpx - - async with httpx.AsyncClient() as client: - response = await client.get(TEI_EMBEDDING_ENDPOINT + "/info") - if response.status_code != 200: - raise HTTPException( - status_code=400, detail=f"TEI embedding endpoint {TEI_EMBEDDING_ENDPOINT} is not available." - ) - model_id = response.json()["model_id"] - # create embeddings using TEI endpoint service - embedder = HuggingFaceInferenceAPIEmbeddings( - api_key=HUGGINGFACEHUB_API_TOKEN, model_name=model_id, api_url=TEI_EMBEDDING_ENDPOINT - ) - else: - # create embeddings using local embedding model - embedder = HuggingFaceBgeEmbeddings(model_name=EMBED_MODEL) - return embedder - - async def check_health(self) -> bool: + def check_health(self) -> bool: """Checks the health of the dataprep service. Returns: @@ -357,7 +357,6 @@ async def ingest_files( chunk_overlap: int = Form(100), process_table: bool = Form(False), table_strategy: str = Form("fast"), - ingest_from_graphDB: bool = Form(False), ): """Ingest files/links content into redis database. @@ -375,6 +374,14 @@ async def ingest_files( logger.info(f"[ redis ingest ] files:{files}") logger.info(f"[ redis ingest ] link_list:{link_list}") + KEY_INDEX_NAME = os.getenv("KEY_INDEX_NAME", "file-keys") + if KEY_INDEX_NAME != "file-keys": + logger.info( + f"KEY_INDEX_NAME: {KEY_INDEX_NAME} is different than the default one. Setting up the parameters." + ) + self.data_index_client = self.client.ft(INDEX_NAME) + self.key_index_client = self.client.ft(KEY_INDEX_NAME) + if files: if not isinstance(files, list): files = [files] @@ -382,35 +389,38 @@ async def ingest_files( for file in files: encode_file = encode_filename(file.filename) - doc_id = "file:" + encode_file + doc_id = "file:" + KEY_INDEX_NAME + "_" + encode_file if logflag: logger.info(f"[ redis ingest ] processing file {doc_id}") - # check whether the file already exists - key_ids = None - try: - key_ids = search_by_id(self.key_index_client, doc_id).key_ids - if logflag: - logger.info(f"[ redis ingest] File {file.filename} already exists.") - except Exception as e: - logger.info(f"[ redis ingest] File {file.filename} does not exist.") - if key_ids: - raise HTTPException( - status_code=400, - detail=f"Uploaded file {file.filename} already exists. Please change file name.", - ) + if KEY_INDEX_NAME in self.get_list_of_indices(): + # check whether the file already exists + key_ids = None + try: + key_ids = search_by_id(self.key_index_client, doc_id).key_ids + if logflag: + logger.info( + f"[ redis ingest] File '{file.filename}' already exists in '{KEY_INDEX_NAME}' index." + ) + except Exception as e: + logger.info(f"[ redis ingest] File {file.filename} does not exist.") + if key_ids: + raise HTTPException( + status_code=400, + detail=f"Uploaded file '{file.filename}' already exists in '{KEY_INDEX_NAME}' index. Please change file name or 'index_name'.", + ) save_path = upload_folder + encode_file await save_content_to_local_disk(save_path, file) - await ingest_data_to_redis( + + ingest_data_to_redis( DocPath( path=save_path, chunk_size=chunk_size, chunk_overlap=chunk_overlap, process_table=process_table, table_strategy=table_strategy, - ), - self.embedder, + ) ) uploaded_files.append(save_path) if logflag: @@ -447,15 +457,14 @@ async def ingest_files( save_path = upload_folder + encoded_link + ".txt" content = parse_html_new([link], chunk_size=chunk_size, chunk_overlap=chunk_overlap) await save_content_to_local_disk(save_path, content) - await ingest_data_to_redis( + ingest_data_to_redis( DocPath( path=save_path, chunk_size=chunk_size, chunk_overlap=chunk_overlap, process_table=process_table, table_strategy=table_strategy, - ), - self.embedder, + ) ) if logflag: logger.info(f"[ redis ingest] Successfully saved link list {link_list}") @@ -463,7 +472,7 @@ async def ingest_files( raise HTTPException(status_code=400, detail="Must provide either a file or a string list.") - async def get_files(self): + async def get_files(self, key_index_name=KEY_INDEX_NAME): """Get file structure from redis database in the format of { "name": "File Name", @@ -472,6 +481,8 @@ async def get_files(self): "parent": "", }""" + if key_index_name is None: + key_index_name = KEY_INDEX_NAME if logflag: logger.info("[ redis get ] start to get file structure") @@ -479,20 +490,20 @@ async def get_files(self): file_list = [] # check index existence - res = await check_index_existance(self.key_index_client) + res = key_index_name in self.get_list_of_indices() if not res: if logflag: - logger.info(f"[ redis get ] index {KEY_INDEX_NAME} does not exist") + logger.info(f"[ redis get ] index {key_index_name} does not exist") return file_list while True: response = self.client.execute_command( - "FT.SEARCH", KEY_INDEX_NAME, "*", "LIMIT", offset, offset + SEARCH_BATCH_SIZE + "FT.SEARCH", key_index_name, "*", "LIMIT", offset, offset + SEARCH_BATCH_SIZE ) # no doc retrieved if len(response) < 2: break - file_list = format_search_results(response, file_list) + file_list = format_search_results(response, key_index_name, file_list) offset += SEARCH_BATCH_SIZE # last batch if (len(response) - 1) // 2 < SEARCH_BATCH_SIZE: @@ -517,7 +528,7 @@ async def delete_files(self, file_path: str = Body(..., embed=True)): logger.info("[ redis delete ] delete all files") # drop index KEY_INDEX_NAME - if await check_index_existance(self.key_index_client): + if check_index_existance(self.key_index_client): try: assert drop_index(index_name=KEY_INDEX_NAME) except Exception as e: @@ -528,7 +539,7 @@ async def delete_files(self, file_path: str = Body(..., embed=True)): logger.info(f"[ redis delete ] Index {KEY_INDEX_NAME} does not exits.") # drop index INDEX_NAME - if await check_index_existance(self.data_index_client): + if check_index_existance(self.data_index_client): try: assert drop_index(index_name=INDEX_NAME) except Exception as e: @@ -620,3 +631,15 @@ async def delete_files(self, file_path: str = Body(..., embed=True)): if logflag: logger.info(f"[ redis delete ] Delete folder {file_path} is not supported for now.") raise HTTPException(status_code=404, detail=f"Delete folder {file_path} is not supported for now.") + + def get_list_of_indices(self): + """Retrieves a list of all indices from the Redis client. + + Returns: + A list of index names as strings. + """ + # Execute the command to list all indices + indices = self.client.execute_command("FT._LIST") + # Decode each index name from bytes to string + indices_list = [item.decode("utf-8") for item in indices] + return indices_list diff --git a/comps/dataprep/src/opea_dataprep_loader.py b/comps/dataprep/src/opea_dataprep_loader.py index 8ec1042f8d..be4a15d9d7 100644 --- a/comps/dataprep/src/opea_dataprep_loader.py +++ b/comps/dataprep/src/opea_dataprep_loader.py @@ -32,6 +32,11 @@ async def delete_files(self, *args, **kwargs): logger.info("[ dataprep loader ] delete files") return await self.component.delete_files(*args, **kwargs) + async def get_list_of_indices(self, *args, **kwargs): + if logflag: + logger.info("[ dataprep loader ] get indices") + return self.component.get_list_of_indices(*args, **kwargs) + class OpeaDataprepMultiModalLoader(OpeaComponentLoader): def __init__(self, component_name, **kwargs): diff --git a/comps/dataprep/src/opea_dataprep_microservice.py b/comps/dataprep/src/opea_dataprep_microservice.py index 8c132954b6..ff3c19db2f 100644 --- a/comps/dataprep/src/opea_dataprep_microservice.py +++ b/comps/dataprep/src/opea_dataprep_microservice.py @@ -55,19 +55,21 @@ async def ingest_files( chunk_overlap: int = Form(100), process_table: bool = Form(False), table_strategy: str = Form("fast"), - ingest_from_graphDB: bool = Form(False), + key_index_name: Optional[str] = Form(None), ): start = time.time() + if key_index_name: + # Set key_input_name to environment variable + os.environ["KEY_INDEX_NAME"] = key_index_name + if logflag: logger.info(f"[ ingest ] files:{files}") logger.info(f"[ ingest ] link_list:{link_list}") try: # Use the loader to invoke the component - response = await loader.ingest_files( - files, link_list, chunk_size, chunk_overlap, process_table, table_strategy, ingest_from_graphDB - ) + response = await loader.ingest_files(files, link_list, chunk_size, chunk_overlap, process_table, table_strategy) # Log the result if logging is enabled if logflag: logger.info(f"[ ingest ] Output generated: {response}") @@ -87,7 +89,7 @@ async def ingest_files( port=5000, ) @register_statistics(names=["opea_service@dataprep"]) -async def get_files(): +async def get_files(key_index_name: Optional[str] = File(None)): start = time.time() if logflag: @@ -95,7 +97,7 @@ async def get_files(): try: # Use the loader to invoke the component - response = await loader.get_files() + response = await loader.get_files(key_index_name) # Log the result if logging is enabled if logflag: logger.info(f"[ get ] ingested files: {response}") @@ -135,6 +137,36 @@ async def delete_files(file_path: str = Body(..., embed=True)): raise +@register_microservice( + name="opea_service@dataprep", + service_type=ServiceType.DATAPREP, + endpoint="/v1/dataprep/indices", + host="0.0.0.0", + port=5000, +) +@register_statistics(names=["opea_service@dataprep"]) +async def get_list_of_indices(): + start = time.time() + if logflag: + logger.info("[ get ] start to get list of indices.") + + try: + # Use the loader to invoke the component + response = await loader.get_list_of_indices() + + # Log the result if logging is enabled + if logflag: + logger.info(f"[ get ] list of indices: {response}") + + # Record statistics + statistics_dict["opea_service@dataprep"].append_latency(time.time() - start, None) + + return response + except Exception as e: + logger.error(f"Error during dataprep get list of indices: {e}") + raise + + if __name__ == "__main__": logger.info("OPEA Dataprep Microservice is starting...") create_upload_folder(upload_folder) diff --git a/comps/dataprep/src/utils.py b/comps/dataprep/src/utils.py index 5d3ded4fc9..39b6d4a68f 100644 --- a/comps/dataprep/src/utils.py +++ b/comps/dataprep/src/utils.py @@ -838,14 +838,14 @@ def get_file_structure(root_path: str, parent_path: str = "") -> List[Dict[str, return result -def format_search_results(response, file_list: list): +def format_search_results(response, key_index_name, file_list: list): for i in range(1, len(response), 2): - file_name = response[i].decode()[5:] + file_name = response[i].decode()[4:] file_dict = { "name": decode_filename(file_name), "id": decode_filename(file_name), "type": "File", - "parent": "", + "parent": key_index_name, } file_list.append(file_dict) return file_list diff --git a/comps/retrievers/src/integrations/redis.py b/comps/retrievers/src/integrations/redis.py index f71a0ae5f2..38ef629499 100644 --- a/comps/retrievers/src/integrations/redis.py +++ b/comps/retrievers/src/integrations/redis.py @@ -56,16 +56,16 @@ def __init__(self, name: str, description: str, config: dict = None): if not health_status: logger.error("OpeaRedisRetriever health check failed.") - def _initialize_client(self) -> Redis: + def _initialize_client(self, index_name=INDEX_NAME) -> Redis: """Initializes the redis client.""" try: if BRIDGE_TOWER_EMBEDDING: logger.info(f"generate multimodal redis instance with {BRIDGE_TOWER_EMBEDDING}") client = Redis( - embedding=self.embeddings, index_name=INDEX_NAME, index_schema=INDEX_SCHEMA, redis_url=REDIS_URL + embedding=self.embeddings, index_name=index_name, index_schema=INDEX_SCHEMA, redis_url=REDIS_URL ) else: - client = Redis(embedding=self.embeddings, index_name=INDEX_NAME, redis_url=REDIS_URL) + client = Redis(embedding=self.embeddings, index_name=index_name, redis_url=REDIS_URL) return client except Exception as e: logger.error(f"fail to initialize redis client: {e}") @@ -101,6 +101,9 @@ async def invoke( if logflag: logger.info(input) + if input.index_name: + self.client = self._initialize_client(index_name=input.index_name) + # check if the Redis index has data if self.client.client.keys() == []: search_res = []