From 6be25eb661083e387b422f9d08cef722e98bd208 Mon Sep 17 00:00:00 2001 From: Mustafa Date: Mon, 3 Feb 2025 22:46:06 -0800 Subject: [PATCH 1/5] initial commit Signed-off-by: Mustafa --- .../src/integrations/redis_multimodal.py | 38 ++ comps/dataprep/src/opea_dataprep_loader.py | 7 + .../opea_dataprep_multimodal_microservice.py | 9 + comps/dataprep/src/requirements.txt | 5 +- wip_opea_dataprep_multimodal_microservice.py | 326 ++++++++++++++++++ 5 files changed, 383 insertions(+), 2 deletions(-) create mode 100644 wip_opea_dataprep_multimodal_microservice.py diff --git a/comps/dataprep/src/integrations/redis_multimodal.py b/comps/dataprep/src/integrations/redis_multimodal.py index 6ae4d185bc..81d9404620 100644 --- a/comps/dataprep/src/integrations/redis_multimodal.py +++ b/comps/dataprep/src/integrations/redis_multimodal.py @@ -7,6 +7,8 @@ import shutil import time import uuid +import redis + from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Type, Union @@ -105,6 +107,8 @@ def format_redis_conn_from_env(): logger = CustomLogger("opea_dataprep_redis_multimodal") logflag = os.getenv("LOGFLAG", False) +redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=os.getenv("REDIS_PASSWORD", None)) + class MultimodalRedis(Redis): """Redis vector database to process multimodal data.""" @@ -192,6 +196,11 @@ def from_text_image_pairs_return_keys( if images else instance.add_text(texts, metadatas, keys=keys) ) + + logger.info(" >>>>>>> from_text_image_pairs_return_keys .... ") + print("keys: ", keys) + # print("instance: ", instance) + return instance, keys def add_text_image_pairs( @@ -465,6 +474,16 @@ def ingest_multimodal(self, filename, data_folder, embeddings, is_pdf=False): annotation, path_to_frames, filename ) + + # # Vector Index Configuration + # print(">>>>>>> INDEX_NAME:", INDEX_NAME) + # INDEX_NAME = os.getenv("INDEX_NAME", "mm-rag-redis") + # print(">>>>>>> INDEX_NAME:", INDEX_NAME) + + INDEX_NAME = os.getenv("INDEX_NAME", "mm-rag-redis") + + print(">>>>>>> INDEX_NAME:", INDEX_NAME) + MultimodalRedis.from_text_image_pairs_return_keys( texts=[f"From {filename}. " + text for text in text_list], images=image_list, @@ -475,6 +494,15 @@ def ingest_multimodal(self, filename, data_folder, embeddings, is_pdf=False): redis_url=REDIS_URL, ) + def get_list_of_indexes(self, redis_client=redis_client): + # Use the FT._LIST command to get the list of indexes + indexes = redis_client.execute_command('FT._LIST') + indexes_list = [item.decode('utf-8').strip("'") for item in indexes] + print(">>>>>>>> >>>>>> indexes_list:", indexes_list) + + return indexes_list + + def drop_index(self, index_name, redis_url=REDIS_URL): logger.info(f"dropping index {index_name}") try: @@ -648,6 +676,9 @@ async def ingest_generate_captions(self, files: List[UploadFile] = File(None)): raise HTTPException(status_code=400, detail="Must provide at least one file.") async def ingest_files(self, files: Optional[Union[UploadFile, List[UploadFile]]] = File(None)): + + logger.info(">>>>>>>>>>>> redis multimodal - Redis Ingest Files <<<<<<<<<<<<") + if files: accepted_media_formats = [".mp4", ".png", ".jpg", ".jpeg", ".gif", ".pdf"] # Create a lookup dictionary containing all media files @@ -706,6 +737,7 @@ async def ingest_files(self, files: Optional[Union[UploadFile, List[UploadFile]] uploaded_files_map[file_name] = media_file_name if file_extension == ".pdf": + # Set up location to store pdf images and text, reusing "frames" and "annotations" from video output_dir = os.path.join(self.upload_folder, media_dir_name) os.makedirs(output_dir, exist_ok=True) @@ -745,6 +777,9 @@ async def ingest_files(self, files: Optional[Union[UploadFile, List[UploadFile]] "sub_video_id": image_idx, } ) + + print(">>>> len(annotations)",len(annotations)) + # print(">>>> annotations:", annotations) with open(os.path.join(output_dir, "annotations.json"), "w") as f: json.dump(annotations, f) @@ -753,6 +788,7 @@ async def ingest_files(self, files: Optional[Union[UploadFile, List[UploadFile]] self.ingest_multimodal( file_name, os.path.join(self.upload_folder, media_dir_name), self.embeddings, is_pdf=True ) + else: # Save caption file in upload directory caption_file_extension = os.path.splitext(matched_files[media_file][1].filename)[1] @@ -793,6 +829,8 @@ async def ingest_files(self, files: Optional[Union[UploadFile, List[UploadFile]] async def get_files(self): """Returns list of names of uploaded videos saved on the server.""" + logger.info(f">>>>>>> get_files .... from self.upload_folder:: {self.upload_folder}") + if not Path(self.upload_folder).exists(): logger.info("No file uploaded, return empty list.") return [] diff --git a/comps/dataprep/src/opea_dataprep_loader.py b/comps/dataprep/src/opea_dataprep_loader.py index 8ec1042f8d..10756cbf08 100644 --- a/comps/dataprep/src/opea_dataprep_loader.py +++ b/comps/dataprep/src/opea_dataprep_loader.py @@ -79,3 +79,10 @@ async def delete_files(self, *args, **kwargs): if logflag: logger.info("[ dataprep loader ] delete files") return await self.component.delete_files(*args, **kwargs) + + async def get_list_of_indexes(self, *args, **kwargs): + if logflag: + logger.info("[ dataprep loader ] get indexes") + return self.component.get_list_of_indexes(*args, **kwargs) + # return await self.component.get_list_of_indexes(*args, **kwargs) + \ No newline at end of file diff --git a/comps/dataprep/src/opea_dataprep_multimodal_microservice.py b/comps/dataprep/src/opea_dataprep_multimodal_microservice.py index 9fbb562a17..1378c11f76 100644 --- a/comps/dataprep/src/opea_dataprep_multimodal_microservice.py +++ b/comps/dataprep/src/opea_dataprep_multimodal_microservice.py @@ -43,6 +43,15 @@ @register_statistics(names=["opea_service@dataprep_multimodal"]) async def ingest_files(files: Optional[Union[UploadFile, List[UploadFile]]] = File(None)): start = time.time() + + logger.info(">>>>>>>>>>>>>> this is /v1/dataprep/ingest") + logger.info(f"[ ingest ] files:{files}") + logger.info(f"[ component ] files:{dataprep_component_name}") + + DATAPREP_MMR_PORT = os.getenv("DATAPREP_MMR_PORT") # , "OPEA_DATAPREP_MULTIMODALVDMS") + logger.info(f"[ DATAPREP_MMR_PORT ] files:{DATAPREP_MMR_PORT}") + + logger.info("------------------------------------------") if logflag: logger.info(f"[ ingest ] files:{files}") diff --git a/comps/dataprep/src/requirements.txt b/comps/dataprep/src/requirements.txt index b2c7f02fbb..ba12387772 100644 --- a/comps/dataprep/src/requirements.txt +++ b/comps/dataprep/src/requirements.txt @@ -1,7 +1,8 @@ beautifulsoup4 cairosvg decord -docarray[full] +docarray +# [full] docx2txt easyocr einops @@ -40,7 +41,7 @@ pgvector==0.2.5 Pillow pinecone-client prometheus-fastapi-instrumentator -psycopg2 +#psycopg2 pymupdf pyspark pytesseract diff --git a/wip_opea_dataprep_multimodal_microservice.py b/wip_opea_dataprep_multimodal_microservice.py new file mode 100644 index 0000000000..017ec06a57 --- /dev/null +++ b/wip_opea_dataprep_multimodal_microservice.py @@ -0,0 +1,326 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + + +import os +import time +from typing import List, Optional, Union + +from fastapi import Body, File, UploadFile +from comps.dataprep.src.integrations.redis_multimodal import OpeaMultimodalRedisDataprep +from comps.dataprep.src.integrations.vdms_multimodal import OpeaMultimodalVdmsDataprep +from comps.dataprep.src.opea_dataprep_loader import OpeaDataprepMultiModalLoader + +from comps import ( + CustomLogger, + ServiceType, + opea_microservices, + register_microservice, + register_statistics, + statistics_dict, +) +from comps.dataprep.src.utils import create_upload_folder + +logger = CustomLogger("opea_dataprep_multimodal_microservice") +logflag = os.getenv("LOGFLAG", False) +upload_folder = "./uploaded_files/" + +dataprep_component_name = os.getenv("DATAPREP_COMPONENT_NAME", "OPEA_DATAPREP_MULTIMODALVDMS") +# Initialize OpeaComponentLoader +loader = OpeaDataprepMultiModalLoader( + dataprep_component_name, + description=f"OPEA DATAPREP Multimodal Component: {dataprep_component_name}", +) + + +@register_microservice( + name="opea_service@dataprep_multimodal", + service_type=ServiceType.DATAPREP, + endpoint="/v1/dataprep/ingest", + host="0.0.0.0", + port=5000, +) +@register_statistics(names=["opea_service@dataprep_multimodal"]) +async def ingest_files(files: Optional[Union[UploadFile, List[UploadFile]]] = File(None), + index_name: Optional[str] = File(None) + # index_name: Optional[str]=str(None), + ): + start = time.time() + + logger.info(">>>>>>>>>>>>>> this is /v1/dataprep/ingest") + logger.info(f"[index_name] :{index_name}") + + if index_name: + print(">>>>>>>> table name will be updated .... ") + print(f"INDEX_NAME: {os.environ['INDEX_NAME']}") + # Set an environment variable + os.environ['INDEX_NAME'] = index_name + print(f"INDEX_NAME: {os.environ['INDEX_NAME']}") + + + logger.info(f"[ ingest ] files:{files}") + + logger.info(f"[ component ] files:{dataprep_component_name}") + + DATAPREP_MMR_PORT = os.getenv("DATAPREP_MMR_PORT") + logger.info(f"[ DATAPREP_MMR_PORT ] files:{DATAPREP_MMR_PORT}") + + logger.info("------------------------------------------") + + if logflag: + logger.info(f"[ ingest ] files:{files}") + + try: + # Use the loader to invoke the component + response = await loader.ingest_files(files) + # Log the result if logging is enabled + if logflag: + logger.info(f"[ ingest ] Output generated: {response}") + # Record statistics + statistics_dict["opea_service@dataprep_multimodal"].append_latency(time.time() - start, None) + return response + except Exception as e: + logger.error(f"Error during dataprep ingest files invocation: {e}") + raise + + +@register_microservice( + name="opea_service@dataprep_multimodal", + service_type=ServiceType.DATAPREP, + endpoint="/v1/dataprep/ingest_videos", + host="0.0.0.0", + port=5000, +) +@register_statistics(names=["opea_service@dataprep_multimodal"]) +async def ingest_videos(files: Optional[Union[UploadFile, List[UploadFile]]] = File(None)): + start = time.time() + + if logflag: + logger.info(f"[ ingest ] files:{files}") + + try: + # Use the loader to invoke the component + response = await loader.ingest_videos(files) + # Log the result if logging is enabled + if logflag: + logger.info(f"[ ingest ] Output generated: {response}") + # Record statistics + statistics_dict["opea_service@dataprep_multimodal"].append_latency(time.time() - start, None) + return response + except Exception as e: + logger.error(f"Error during dataprep ingest videos invocation: {e}") + raise + + +@register_microservice( + name="opea_service@dataprep_multimodal", + service_type=ServiceType.DATAPREP, + endpoint="/v1/dataprep/generate_transcripts", + host="0.0.0.0", + port=5000, +) +@register_statistics(names=["opea_service@dataprep_multimodal"]) +async def ingest_generate_transcripts(files: Optional[Union[UploadFile, List[UploadFile]]] = File(None)): + start = time.time() + + if logflag: + logger.info(f"[ ingest ] files:{files}") + try: + # Use the loader to invoke the component + response = await loader.ingest_generate_transcripts(files) + # Log the result if logging is enabled + if logflag: + logger.info(f"[ ingest ] Output generated: {response}") + # Record statistics + statistics_dict["opea_service@dataprep_multimodal"].append_latency(time.time() - start, None) + return response + except Exception as e: + logger.error(f"Error during dataprep generate_transcripts invocation: {e}") + raise + + +@register_microservice( + name="opea_service@dataprep_multimodal", + service_type=ServiceType.DATAPREP, + endpoint="/v1/dataprep/generate_captions", + host="0.0.0.0", + port=5000, +) +@register_statistics(names=["opea_service@dataprep_multimodal"]) +async def ingest_generate_captions(files: Optional[Union[UploadFile, List[UploadFile]]] = File(None)): + start = time.time() + + if logflag: + logger.info(f"[ ingest ] files:{files}") + + try: + # Use the loader to invoke the component + response = await loader.ingest_generate_captions(files) + # Log the result if logging is enabled + if logflag: + logger.info(f"[ ingest ] Output generated: {response}") + # Record statistics + statistics_dict["opea_service@dataprep_multimodal"].append_latency(time.time() - start, None) + return response + except Exception as e: + logger.error(f"Error during dataprep generate_captions invocation: {e}") + raise + + +@register_microservice( + name="opea_service@dataprep_multimodal", + service_type=ServiceType.DATAPREP, + endpoint="/v1/dataprep/get", + host="0.0.0.0", + port=5000, +) +@register_statistics(names=["opea_service@dataprep_multimodal"]) +async def get_files(): + start = time.time() + + # logger.info(f"Processing video input at {_endpoint}/v1/video2audio") + + if logflag: + logger.info("[ get ] start to get ingested files") + + try: + # Use the loader to invoke the component + response = await loader.get_files() + # Log the result if logging is enabled + if logflag: + logger.info(f"[ get ] ingested files: {response}") + # Record statistics + statistics_dict["opea_service@dataprep_multimodal"].append_latency(time.time() - start, None) + return response + except Exception as e: + logger.error(f"Error during dataprep get files invocation: {e}") + raise + + +@register_microservice( + name="opea_service@dataprep_multimodal", + service_type=ServiceType.DATAPREP, + endpoint="/v1/dataprep/get/{filename}", + host="0.0.0.0", + port=5000, + methods=["GET"], +) +@register_statistics(names=["opea_service@dataprep_multimodal"]) +async def get_one_file(filename: str): + start = time.time() + + if logflag: + logger.info("[ get ] start to get ingested files") + + try: + # Use the loader to invoke the component + response = await loader.get_one_file(filename) + # Log the result if logging is enabled + if logflag: + logger.info(f"[ get ] ingested files: {response}") + # Record statistics + statistics_dict["opea_service@dataprep_multimodal"].append_latency(time.time() - start, None) + return response + except Exception as e: + logger.error(f"Error during dataprep get one file invocation: {e}") + raise + + +@register_microservice( + name="opea_service@dataprep_multimodal", + service_type=ServiceType.DATAPREP, + endpoint="/v1/dataprep/get_videos", + host="0.0.0.0", + port=5000, + methods=["GET"], +) +@register_statistics(names=["opea_service@dataprep_multimodal"]) +async def get_videos(): + start = time.time() + + if logflag: + logger.info("[ get ] start to get ingested files") + + try: + # Use the loader to invoke the component + response = await loader.get_videos() + # Log the result if logging is enabled + if logflag: + logger.info(f"[ get ] ingested files: {response}") + # Record statistics + statistics_dict["opea_service@dataprep_multimodal"].append_latency(time.time() - start, None) + return response + except Exception as e: + logger.error(f"Error during dataprep get videos invocation: {e}") + raise + + +@register_microservice( + name="opea_service@dataprep_multimodal", + service_type=ServiceType.DATAPREP, + endpoint="/v1/dataprep/delete", + host="0.0.0.0", + port=5000, +) +@register_statistics(names=["opea_service@dataprep_multimodal"]) +async def delete_files(file_path: str = Body(..., embed=True)): + start = time.time() + + if logflag: + logger.info("[ delete ] start to delete ingested files") + + try: + # Use the loader to invoke the component + response = await loader.delete_files(file_path) + # Log the result if logging is enabled + if logflag: + logger.info(f"[ delete ] deleted result: {response}") + # Record statistics + statistics_dict["opea_service@dataprep_multimodal"].append_latency(time.time() - start, None) + return response + except Exception as e: + logger.error(f"Error during dataprep delete invocation: {e}") + raise + +@register_microservice( + name="opea_service@dataprep_multimodal", + service_type=ServiceType.DATAPREP, + endpoint="/v1/dataprep/indexes", + host="0.0.0.0", + port=5000, +) +@register_statistics(names=["opea_service@dataprep_multimodal"]) +async def get_list_of_indexes(): + start = time.time() + + logger.info(">>>>> [ get ] start to get list of indexes.") + + if logflag: + logger.info("[ get ] start to get list of indexes.") + + try: + # Use the loader to invoke the component + # response = await loader.get_files() + print("=============================================================") + response = await loader.get_list_of_indexes() + + logger.info(f"[ get ] list of indexes: {response}") + print("=============================================================") + + # Log the result if logging is enabled + if logflag: + logger.info(f"[ get ] list of indexes: {response}") + + # Record statistics + statistics_dict["opea_service@dataprep_multimodal"].append_latency(time.time() - start, None) + + return response + except Exception as e: + logger.error(f"Error during dataprep get list of indexes: {e}") + raise + + +if __name__ == "__main__": + logger.info("OPEA Dataprep Multimodal Microservice is starting...") + create_upload_folder(upload_folder) + opea_microservices["opea_service@dataprep_multimodal"].start() From e63af46e9e7c911bd54ea1a01e87d9303f00c692 Mon Sep 17 00:00:00 2001 From: Mustafa Date: Thu, 6 Feb 2025 14:20:44 -0800 Subject: [PATCH 2/5] updates for retriever microservices Signed-off-by: Mustafa --- comps/cores/proto/docarray.py | 2 +- .../src/integrations/redis_multimodal.py | 64 ++++++++------ comps/dataprep/src/opea_dataprep_loader.py | 13 +-- .../opea_dataprep_multimodal_microservice.py | 84 ++++++++++++++++--- comps/retrievers/src/integrations/redis.py | 13 +-- wip_opea_dataprep_multimodal_microservice.py | 66 ++++++++------- 6 files changed, 162 insertions(+), 80 deletions(-) diff --git a/comps/cores/proto/docarray.py b/comps/cores/proto/docarray.py index 5a86d3c90a..55d840350b 100644 --- a/comps/cores/proto/docarray.py +++ b/comps/cores/proto/docarray.py @@ -102,7 +102,7 @@ class EmbedDoc(BaseDoc): lambda_mult: float = 0.5 score_threshold: float = 0.2 constraints: Optional[Union[Dict[str, Any], List[Dict[str, Any]], None]] = None - + index_name: Optional[str] = None class EmbedMultimodalDoc(EmbedDoc): # extend EmbedDoc with these attributes diff --git a/comps/dataprep/src/integrations/redis_multimodal.py b/comps/dataprep/src/integrations/redis_multimodal.py index 81d9404620..47719282b6 100644 --- a/comps/dataprep/src/integrations/redis_multimodal.py +++ b/comps/dataprep/src/integrations/redis_multimodal.py @@ -197,10 +197,6 @@ def from_text_image_pairs_return_keys( else instance.add_text(texts, metadatas, keys=keys) ) - logger.info(" >>>>>>> from_text_image_pairs_return_keys .... ") - print("keys: ", keys) - # print("instance: ", instance) - return instance, keys def add_text_image_pairs( @@ -473,17 +469,9 @@ def ingest_multimodal(self, filename, data_folder, embeddings, is_pdf=False): text_list, image_list, metadatas = self.prepare_data_and_metadata_from_annotation( annotation, path_to_frames, filename ) - - - # # Vector Index Configuration - # print(">>>>>>> INDEX_NAME:", INDEX_NAME) - # INDEX_NAME = os.getenv("INDEX_NAME", "mm-rag-redis") - # print(">>>>>>> INDEX_NAME:", INDEX_NAME) INDEX_NAME = os.getenv("INDEX_NAME", "mm-rag-redis") - print(">>>>>>> INDEX_NAME:", INDEX_NAME) - MultimodalRedis.from_text_image_pairs_return_keys( texts=[f"From {filename}. " + text for text in text_list], images=image_list, @@ -493,16 +481,47 @@ def ingest_multimodal(self, filename, data_folder, embeddings, is_pdf=False): index_schema=INDEX_SCHEMA, redis_url=REDIS_URL, ) + + def get_list_of_indices(self, redis_client=redis_client): + """ + Retrieves a list of all indices from the Redis client. - def get_list_of_indexes(self, redis_client=redis_client): - # Use the FT._LIST command to get the list of indexes - indexes = redis_client.execute_command('FT._LIST') - indexes_list = [item.decode('utf-8').strip("'") for item in indexes] - print(">>>>>>>> >>>>>> indexes_list:", indexes_list) + Args: + redis_client: The Redis client instance to use for executing commands. - return indexes_list + Returns: + A list of index names as strings. + """ + # Execute the command to list all indices + indices = redis_client.execute_command('FT._LIST') + # Decode each index name from bytes to string and strip any surrounding single quotes + indices_list = [item.decode('utf-8').strip("'") for item in indices] + return indices_list + + def get_items_of_index(self, index_name=INDEX_NAME, redis_client=redis_client): + """ + Retrieves items from a specific index in Redis. + + Args: + index_name: The name of the index to search. + redis_client: The Redis client instance to use for executing commands. + + Returns: + A sorted list of items from the specified index. + """ + # Execute the command to search for all items in the specified index + results = redis_client.execute_command(f'FT.SEARCH {index_name} {"*"} LIMIT 0 100') + list_of_items = [] + # Iterate through the results + for r in results: + if isinstance(r, list): + # Extract and decode the item where 'source_video' is found in the value + list_of_items.append( + [r[i+1].decode('utf-8') for i, v in enumerate(r) if 'source_video' in str(v)][0] + ) + # Return the sorted list of items + return sorted(list_of_items) - def drop_index(self, index_name, redis_url=REDIS_URL): logger.info(f"dropping index {index_name}") try: @@ -677,8 +696,6 @@ async def ingest_generate_captions(self, files: List[UploadFile] = File(None)): async def ingest_files(self, files: Optional[Union[UploadFile, List[UploadFile]]] = File(None)): - logger.info(">>>>>>>>>>>> redis multimodal - Redis Ingest Files <<<<<<<<<<<<") - if files: accepted_media_formats = [".mp4", ".png", ".jpg", ".jpeg", ".gif", ".pdf"] # Create a lookup dictionary containing all media files @@ -778,9 +795,6 @@ async def ingest_files(self, files: Optional[Union[UploadFile, List[UploadFile]] } ) - print(">>>> len(annotations)",len(annotations)) - # print(">>>> annotations:", annotations) - with open(os.path.join(output_dir, "annotations.json"), "w") as f: json.dump(annotations, f) @@ -828,8 +842,6 @@ async def ingest_files(self, files: Optional[Union[UploadFile, List[UploadFile]] async def get_files(self): """Returns list of names of uploaded videos saved on the server.""" - - logger.info(f">>>>>>> get_files .... from self.upload_folder:: {self.upload_folder}") if not Path(self.upload_folder).exists(): logger.info("No file uploaded, return empty list.") diff --git a/comps/dataprep/src/opea_dataprep_loader.py b/comps/dataprep/src/opea_dataprep_loader.py index 10756cbf08..aa8cf0d745 100644 --- a/comps/dataprep/src/opea_dataprep_loader.py +++ b/comps/dataprep/src/opea_dataprep_loader.py @@ -80,9 +80,12 @@ async def delete_files(self, *args, **kwargs): logger.info("[ dataprep loader ] delete files") return await self.component.delete_files(*args, **kwargs) - async def get_list_of_indexes(self, *args, **kwargs): + async def get_list_of_indices(self, *args, **kwargs): if logflag: - logger.info("[ dataprep loader ] get indexes") - return self.component.get_list_of_indexes(*args, **kwargs) - # return await self.component.get_list_of_indexes(*args, **kwargs) - \ No newline at end of file + logger.info("[ dataprep loader ] get indices") + return self.component.get_list_of_indices(*args, **kwargs) + + async def get_items_of_index(self, *args, **kwargs): + if logflag: + logger.info("[ dataprep loader ] get items of the index") + return self.component.get_items_of_index(*args, **kwargs) \ No newline at end of file diff --git a/comps/dataprep/src/opea_dataprep_multimodal_microservice.py b/comps/dataprep/src/opea_dataprep_multimodal_microservice.py index 1378c11f76..ce2cad6c2f 100644 --- a/comps/dataprep/src/opea_dataprep_multimodal_microservice.py +++ b/comps/dataprep/src/opea_dataprep_multimodal_microservice.py @@ -7,9 +7,9 @@ from typing import List, Optional, Union from fastapi import Body, File, UploadFile -from integrations.redis_multimodal import OpeaMultimodalRedisDataprep -from integrations.vdms_multimodal import OpeaMultimodalVdmsDataprep -from opea_dataprep_loader import OpeaDataprepMultiModalLoader +from comps.dataprep.src.integrations.redis_multimodal import OpeaMultimodalRedisDataprep +from comps.dataprep.src.integrations.vdms_multimodal import OpeaMultimodalVdmsDataprep +from comps.dataprep.src.opea_dataprep_loader import OpeaDataprepMultiModalLoader from comps import ( CustomLogger, @@ -41,18 +41,15 @@ port=5000, ) @register_statistics(names=["opea_service@dataprep_multimodal"]) -async def ingest_files(files: Optional[Union[UploadFile, List[UploadFile]]] = File(None)): +async def ingest_files(files: Optional[Union[UploadFile, List[UploadFile]]] = File(None), + index_name: Optional[str] = File(None) + ): start = time.time() - - logger.info(">>>>>>>>>>>>>> this is /v1/dataprep/ingest") - logger.info(f"[ ingest ] files:{files}") - logger.info(f"[ component ] files:{dataprep_component_name}") - - DATAPREP_MMR_PORT = os.getenv("DATAPREP_MMR_PORT") # , "OPEA_DATAPREP_MULTIMODALVDMS") - logger.info(f"[ DATAPREP_MMR_PORT ] files:{DATAPREP_MMR_PORT}") - logger.info("------------------------------------------") - + if index_name: + # Set an environment variable + os.environ['INDEX_NAME'] = index_name + if logflag: logger.info(f"[ ingest ] files:{files}") @@ -266,6 +263,67 @@ async def delete_files(file_path: str = Body(..., embed=True)): logger.error(f"Error during dataprep delete invocation: {e}") raise +@register_microservice( + name="opea_service@dataprep_multimodal", + service_type=ServiceType.DATAPREP, + endpoint="/v1/dataprep/indices", + host="0.0.0.0", + port=5000, +) +@register_statistics(names=["opea_service@dataprep_multimodal"]) +async def get_list_of_indices(): + start = time.time() + + if logflag: + logger.info("[ get ] start to get list of indices.") + + try: + # Use the loader to invoke the component + response = await loader.get_list_of_indices() + + # Log the result if logging is enabled + if logflag: + logger.info(f"[ get ] list of indices: {response}") + + # Record statistics + statistics_dict["opea_service@dataprep_multimodal"].append_latency(time.time() - start, None) + + return response + except Exception as e: + logger.error(f"Error during dataprep get list of indices: {e}") + raise + + +@register_microservice( + name="opea_service@dataprep_multimodal", + service_type=ServiceType.DATAPREP, + endpoint="/v1/dataprep/items_of_index", + host="0.0.0.0", + port=5000, +) +@register_statistics(names=["opea_service@dataprep_multimodal"]) +async def get_items_of_index(index_name: Optional[str] = File(None) ): + start = time.time() + + if logflag: + logger.info(f"[ get ] start to get items of index:{index_name}.") + + try: + # Use the loader to invoke the component + response = await loader.get_items_of_index(index_name) + + # Log the result if logging is enabled + if logflag: + logger.info(f"[ get ] items of index: {response}") + + # Record statistics + statistics_dict["opea_service@dataprep_multimodal"].append_latency(time.time() - start, None) + + return response + except Exception as e: + logger.error(f"Error during dataprep get list of indexes: {e}") + raise + if __name__ == "__main__": logger.info("OPEA Dataprep Multimodal Microservice is starting...") diff --git a/comps/retrievers/src/integrations/redis.py b/comps/retrievers/src/integrations/redis.py index f71a0ae5f2..d38ae0db66 100644 --- a/comps/retrievers/src/integrations/redis.py +++ b/comps/retrievers/src/integrations/redis.py @@ -56,16 +56,16 @@ def __init__(self, name: str, description: str, config: dict = None): if not health_status: logger.error("OpeaRedisRetriever health check failed.") - def _initialize_client(self) -> Redis: + def _initialize_client(self, index_name = INDEX_NAME) -> Redis: """Initializes the redis client.""" try: if BRIDGE_TOWER_EMBEDDING: logger.info(f"generate multimodal redis instance with {BRIDGE_TOWER_EMBEDDING}") client = Redis( - embedding=self.embeddings, index_name=INDEX_NAME, index_schema=INDEX_SCHEMA, redis_url=REDIS_URL + embedding=self.embeddings, index_name=index_name, index_schema=INDEX_SCHEMA, redis_url=REDIS_URL ) else: - client = Redis(embedding=self.embeddings, index_name=INDEX_NAME, redis_url=REDIS_URL) + client = Redis(embedding=self.embeddings, index_name=index_name, redis_url=REDIS_URL) return client except Exception as e: logger.error(f"fail to initialize redis client: {e}") @@ -100,6 +100,9 @@ async def invoke( """ if logflag: logger.info(input) + + if input.index_name: + self.client = self._initialize_client(index_name=input.index_name) # check if the Redis index has data if self.client.client.keys() == []: @@ -140,7 +143,5 @@ async def invoke( else: raise ValueError(f"{input.search_type} not valid") - if logflag: - logger.info(search_res) - return search_res + return search_res \ No newline at end of file diff --git a/wip_opea_dataprep_multimodal_microservice.py b/wip_opea_dataprep_multimodal_microservice.py index 017ec06a57..f773ccc1aa 100644 --- a/wip_opea_dataprep_multimodal_microservice.py +++ b/wip_opea_dataprep_multimodal_microservice.py @@ -43,30 +43,13 @@ @register_statistics(names=["opea_service@dataprep_multimodal"]) async def ingest_files(files: Optional[Union[UploadFile, List[UploadFile]]] = File(None), index_name: Optional[str] = File(None) - # index_name: Optional[str]=str(None), ): start = time.time() - - logger.info(">>>>>>>>>>>>>> this is /v1/dataprep/ingest") - logger.info(f"[index_name] :{index_name}") - + if index_name: - print(">>>>>>>> table name will be updated .... ") - print(f"INDEX_NAME: {os.environ['INDEX_NAME']}") # Set an environment variable os.environ['INDEX_NAME'] = index_name - print(f"INDEX_NAME: {os.environ['INDEX_NAME']}") - - logger.info(f"[ ingest ] files:{files}") - - logger.info(f"[ component ] files:{dataprep_component_name}") - - DATAPREP_MMR_PORT = os.getenv("DATAPREP_MMR_PORT") - logger.info(f"[ DATAPREP_MMR_PORT ] files:{DATAPREP_MMR_PORT}") - - logger.info("------------------------------------------") - if logflag: logger.info(f"[ ingest ] files:{files}") @@ -285,31 +268,56 @@ async def delete_files(file_path: str = Body(..., embed=True)): @register_microservice( name="opea_service@dataprep_multimodal", service_type=ServiceType.DATAPREP, - endpoint="/v1/dataprep/indexes", + endpoint="/v1/dataprep/indices", host="0.0.0.0", port=5000, ) @register_statistics(names=["opea_service@dataprep_multimodal"]) -async def get_list_of_indexes(): +async def get_list_of_indices(): start = time.time() - logger.info(">>>>> [ get ] start to get list of indexes.") - if logflag: - logger.info("[ get ] start to get list of indexes.") + logger.info("[ get ] start to get list of indices.") try: # Use the loader to invoke the component - # response = await loader.get_files() - print("=============================================================") - response = await loader.get_list_of_indexes() + response = await loader.get_list_of_indices() + + # Log the result if logging is enabled + if logflag: + logger.info(f"[ get ] list of indices: {response}") + + # Record statistics + statistics_dict["opea_service@dataprep_multimodal"].append_latency(time.time() - start, None) - logger.info(f"[ get ] list of indexes: {response}") - print("=============================================================") + return response + except Exception as e: + logger.error(f"Error during dataprep get list of indices: {e}") + raise + +@register_microservice( + name="opea_service@dataprep_multimodal", + service_type=ServiceType.DATAPREP, + endpoint="/v1/dataprep/items_of_index", + host="0.0.0.0", + port=5000, +) +@register_statistics(names=["opea_service@dataprep_multimodal"]) +async def get_items_of_index(index_name: Optional[str] = File(None) ): + start = time.time() + + #TODO : make sure index is already in the db + if logflag: + logger.info(f"[ get ] start to get items of index:{index_name}.") + + try: + # Use the loader to invoke the component + response = await loader.get_items_of_index(index_name) + # Log the result if logging is enabled if logflag: - logger.info(f"[ get ] list of indexes: {response}") + logger.info(f"[ get ] items of index: {response}") # Record statistics statistics_dict["opea_service@dataprep_multimodal"].append_latency(time.time() - start, None) From cea8c3dc8cd2e1fa6135036ef60fa953808855c0 Mon Sep 17 00:00:00 2001 From: Mustafa Date: Fri, 7 Feb 2025 18:33:55 -0800 Subject: [PATCH 3/5] update redis.py Signed-off-by: Mustafa --- comps/dataprep/src/integrations/redis.py | 76 +++++++++++++++++++--- comps/dataprep/src/opea_dataprep_loader.py | 22 +++---- 2 files changed, 77 insertions(+), 21 deletions(-) diff --git a/comps/dataprep/src/integrations/redis.py b/comps/dataprep/src/integrations/redis.py index 06cb0d7f27..5d44e8a323 100644 --- a/comps/dataprep/src/integrations/redis.py +++ b/comps/dataprep/src/integrations/redis.py @@ -98,12 +98,12 @@ def format_redis_conn_from_env(): REDIS_URL = format_redis_conn_from_env() redis_pool = redis.ConnectionPool.from_url(REDIS_URL) - -def check_index_existance(client): +def check_index_existance(client, index_name: str = KEY_INDEX_NAME): if logflag: logger.info(f"[ check index existence ] checking {client}") try: results = client.search("*") + if logflag: logger.info(f"[ check index existence ] index of client exists: {client}") return results @@ -114,11 +114,16 @@ def check_index_existance(client): def create_index(client, index_name: str = KEY_INDEX_NAME): + + print(">>>>>>>>>>>>>> create_index - index_name:", index_name) + if logflag: logger.info(f"[ create index ] creating index {index_name}") try: definition = IndexDefinition(index_type=IndexType.HASH, prefix=["file:"]) client.create_index((TextField("file_name"), TextField("key_ids")), definition=definition) + # client.create_index((TextField("index_name"), TextField("file_name"), TextField("key_ids")), definition=definition) + if logflag: logger.info(f"[ create index ] index {index_name} successfully created") except Exception as e: @@ -183,8 +188,15 @@ def delete_by_id(client, id): def ingest_chunks_to_redis(file_name: str, chunks: List): + KEY_INDEX_NAME = os.getenv("KEY_INDEX_NAME") + + print(">>>>>>>>>>>>>>>>>>>> INDEX_NAME:", INDEX_NAME) + print(">>>>>>>>>>>>>>>>>>>> KEY_INDEX_NAME:", KEY_INDEX_NAME) + print(">>>>>>>>>>>>>>>>>>>> file_name:", file_name) + + if logflag: - logger.info(f"[ redis ingest chunks ] file name: {file_name}") + logger.info(f"[ redis ingest chunks ] file name: '{file_name}' to '{KEY_INDEX_NAME}' index.") # Create vectorstore if TEI_EMBEDDING_ENDPOINT: # create embeddings using TEI endpoint service @@ -207,7 +219,7 @@ def ingest_chunks_to_redis(file_name: str, chunks: List): _, keys = Redis.from_texts_return_keys( texts=batch_texts, embedding=embedder, - index_name=INDEX_NAME, + index_name=KEY_INDEX_NAME, redis_url=REDIS_URL, ) if logflag: @@ -215,15 +227,17 @@ def ingest_chunks_to_redis(file_name: str, chunks: List): file_ids.extend(keys) if logflag: logger.info(f"[ redis ingest chunks ] Processed batch {i//batch_size + 1}/{(num_chunks-1)//batch_size + 1}") - + # store file_ids into index file-keys r = redis.Redis(connection_pool=redis_pool) client = r.ft(KEY_INDEX_NAME) + if not check_index_existance(client): - assert create_index(client) - + assert create_index(client, index_name=KEY_INDEX_NAME) + try: assert store_by_id(client, key=file_name, value="#".join(file_ids)) + except Exception as e: if logflag: logger.info(f"[ redis ingest chunks ] {e}. Fail to store chunks of file {file_name}.") @@ -272,6 +286,7 @@ def ingest_data_to_redis(doc_path: DocPath): logger.info(f"[ redis ingest data ] Done preprocessing. Created {len(chunks)} chunks of the given file.") file_name = doc_path.path.split("/")[-1] + return ingest_chunks_to_redis(file_name, chunks) @@ -348,7 +363,9 @@ async def ingest_files( if logflag: logger.info(f"[ redis ingest ] files:{files}") logger.info(f"[ redis ingest ] link_list:{link_list}") - + + KEY_INDEX_NAME = os.getenv("KEY_INDEX_NAME") + if files: if not isinstance(files, list): files = [files] @@ -363,19 +380,21 @@ async def ingest_files( # check whether the file already exists key_ids = None try: + #TODO: =====> this needs to be checked .... key_ids = search_by_id(self.key_index_client, doc_id).key_ids if logflag: - logger.info(f"[ redis ingest] File {file.filename} already exists.") + logger.info(f"[ redis ingest] File '{file.filename}' already exists in '{KEY_INDEX_NAME}' index.") except Exception as e: logger.info(f"[ redis ingest] File {file.filename} does not exist.") if key_ids: raise HTTPException( status_code=400, - detail=f"Uploaded file {file.filename} already exists. Please change file name.", + detail=f"Uploaded file '{file.filename}' already exists in '{KEY_INDEX_NAME}' index. Please change file name or 'index_name'.", ) save_path = upload_folder + encode_file await save_content_to_local_disk(save_path, file) + ingest_data_to_redis( DocPath( path=save_path, @@ -592,3 +611,40 @@ async def delete_files(self, file_path: str = Body(..., embed=True)): if logflag: logger.info(f"[ redis delete ] Delete folder {file_path} is not supported for now.") raise HTTPException(status_code=404, detail=f"Delete folder {file_path} is not supported for now.") + + def get_list_of_indices(self): + """ + Retrieves a list of all indices from the Redis client. + + Returns: + A list of index names as strings. + """ + # Execute the command to list all indices + indices = self.client.execute_command('FT._LIST') + # Decode each index name from bytes to string + indices_list = [item.decode('utf-8') for item in indices] + return indices_list + + def get_items_of_index(self, index_name=INDEX_NAME): # , redis_client=redis_client): + """ + Retrieves items from a specific index in Redis. + + Args: + index_name: The name of the index to search. + redis_client: The Redis client instance to use for executing commands. + + Returns: + A sorted list of items from the specified index. + """ + # Execute the command to search for all items in the specified index + results = self.client.execute_command(f'FT.SEARCH {index_name} {"*"} LIMIT 0 100') + list_of_items = [] + # Iterate through the results + for r in results: + if isinstance(r, list): + # Extract and decode the item where 'source_video' is found in the value + list_of_items.append( + [r[i+1].decode('utf-8') for i, v in enumerate(r) if 'source_video' in str(v)][0] + ) + # Return the sorted list of items + return sorted(list_of_items) \ No newline at end of file diff --git a/comps/dataprep/src/opea_dataprep_loader.py b/comps/dataprep/src/opea_dataprep_loader.py index aa8cf0d745..694047bb81 100644 --- a/comps/dataprep/src/opea_dataprep_loader.py +++ b/comps/dataprep/src/opea_dataprep_loader.py @@ -31,7 +31,17 @@ async def delete_files(self, *args, **kwargs): if logflag: logger.info("[ dataprep loader ] delete files") return await self.component.delete_files(*args, **kwargs) - + + async def get_list_of_indices(self, *args, **kwargs): + if logflag: + logger.info("[ dataprep loader ] get indices") + return self.component.get_list_of_indices(*args, **kwargs) + + async def get_items_of_index(self, *args, **kwargs): + if logflag: + logger.info("[ dataprep loader ] get items of the index") + return self.component.get_items_of_index(*args, **kwargs) + class OpeaDataprepMultiModalLoader(OpeaComponentLoader): def __init__(self, component_name, **kwargs): @@ -79,13 +89,3 @@ async def delete_files(self, *args, **kwargs): if logflag: logger.info("[ dataprep loader ] delete files") return await self.component.delete_files(*args, **kwargs) - - async def get_list_of_indices(self, *args, **kwargs): - if logflag: - logger.info("[ dataprep loader ] get indices") - return self.component.get_list_of_indices(*args, **kwargs) - - async def get_items_of_index(self, *args, **kwargs): - if logflag: - logger.info("[ dataprep loader ] get items of the index") - return self.component.get_items_of_index(*args, **kwargs) \ No newline at end of file From 0cd0fcc6b2c7f8b7b551eaae667c0e5bef84c8b3 Mon Sep 17 00:00:00 2001 From: Mustafa Date: Thu, 13 Feb 2025 10:04:13 -0800 Subject: [PATCH 4/5] add retrievers update Signed-off-by: Mustafa --- comps/dataprep/src/integrations/redis.py | 97 ++--- .../src/integrations/redis_multimodal.py | 11 + comps/dataprep/src/opea_dataprep_loader.py | 7 +- .../src/opea_dataprep_microservice.py | 57 ++- comps/dataprep/src/requirements.txt | 5 +- comps/dataprep/src/utils.py | 6 +- wip_opea_dataprep_multimodal_microservice.py | 334 ------------------ 7 files changed, 98 insertions(+), 419 deletions(-) delete mode 100644 wip_opea_dataprep_multimodal_microservice.py diff --git a/comps/dataprep/src/integrations/redis.py b/comps/dataprep/src/integrations/redis.py index 5d44e8a323..c623918b72 100644 --- a/comps/dataprep/src/integrations/redis.py +++ b/comps/dataprep/src/integrations/redis.py @@ -114,9 +114,6 @@ def check_index_existance(client, index_name: str = KEY_INDEX_NAME): def create_index(client, index_name: str = KEY_INDEX_NAME): - - print(">>>>>>>>>>>>>> create_index - index_name:", index_name) - if logflag: logger.info(f"[ create index ] creating index {index_name}") try: @@ -135,14 +132,14 @@ def create_index(client, index_name: str = KEY_INDEX_NAME): def store_by_id(client, key, value): if logflag: - logger.info(f"[ store by id ] storing ids of {key}") + logger.info(f"[ store by id ] storing ids of {client.index_name + '_' + key}") try: - client.add_document(doc_id="file:" + key, file_name=key, key_ids=value) + client.add_document(doc_id="file:" + client.index_name + '_' + key, file_name=client.index_name + '_' + key, key_ids=value) if logflag: - logger.info(f"[ store by id ] store document success. id: file:{key}") + logger.info(f"[ store by id ] store document success. id: file:{client.index_name + '_' + key}") except Exception as e: if logflag: - logger.info(f"[ store by id ] fail to store document file:{key}: {e}") + logger.info(f"[ store by id ] fail to store document file:{client.index_name + '_' + key}: {e}") return False return True @@ -188,13 +185,7 @@ def delete_by_id(client, id): def ingest_chunks_to_redis(file_name: str, chunks: List): - KEY_INDEX_NAME = os.getenv("KEY_INDEX_NAME") - - print(">>>>>>>>>>>>>>>>>>>> INDEX_NAME:", INDEX_NAME) - print(">>>>>>>>>>>>>>>>>>>> KEY_INDEX_NAME:", KEY_INDEX_NAME) - print(">>>>>>>>>>>>>>>>>>>> file_name:", file_name) - - + KEY_INDEX_NAME = os.getenv("KEY_INDEX_NAME", "file-keys") if logflag: logger.info(f"[ redis ingest chunks ] file name: '{file_name}' to '{KEY_INDEX_NAME}' index.") # Create vectorstore @@ -222,6 +213,7 @@ def ingest_chunks_to_redis(file_name: str, chunks: List): index_name=KEY_INDEX_NAME, redis_url=REDIS_URL, ) + keys = [k.replace(KEY_INDEX_NAME, KEY_INDEX_NAME + '_' + file_name) for k in keys] if logflag: logger.info(f"[ redis ingest chunks ] keys: {keys}") file_ids.extend(keys) @@ -363,9 +355,13 @@ async def ingest_files( if logflag: logger.info(f"[ redis ingest ] files:{files}") logger.info(f"[ redis ingest ] link_list:{link_list}") - - KEY_INDEX_NAME = os.getenv("KEY_INDEX_NAME") + KEY_INDEX_NAME = os.getenv("KEY_INDEX_NAME", "file-keys") + if KEY_INDEX_NAME != "file-keys": + logger.info(f"KEY_INDEX_NAME: {KEY_INDEX_NAME} is different than the default one. Setting up the parameters.") + self.data_index_client = self.client.ft(INDEX_NAME) + self.key_index_client = self.client.ft(KEY_INDEX_NAME) + if files: if not isinstance(files, list): files = [files] @@ -373,24 +369,24 @@ async def ingest_files( for file in files: encode_file = encode_filename(file.filename) - doc_id = "file:" + encode_file + doc_id = "file:" + KEY_INDEX_NAME + '_' + encode_file if logflag: logger.info(f"[ redis ingest ] processing file {doc_id}") - # check whether the file already exists - key_ids = None - try: - #TODO: =====> this needs to be checked .... - key_ids = search_by_id(self.key_index_client, doc_id).key_ids - if logflag: - logger.info(f"[ redis ingest] File '{file.filename}' already exists in '{KEY_INDEX_NAME}' index.") - except Exception as e: - logger.info(f"[ redis ingest] File {file.filename} does not exist.") - if key_ids: - raise HTTPException( - status_code=400, - detail=f"Uploaded file '{file.filename}' already exists in '{KEY_INDEX_NAME}' index. Please change file name or 'index_name'.", - ) + if KEY_INDEX_NAME in self.get_list_of_indices(): + # check whether the file already exists + key_ids = None + try: + key_ids = search_by_id(self.key_index_client, doc_id).key_ids + if logflag: + logger.info(f"[ redis ingest] File '{file.filename}' already exists in '{KEY_INDEX_NAME}' index.") + except Exception as e: + logger.info(f"[ redis ingest] File {file.filename} does not exist.") + if key_ids: + raise HTTPException( + status_code=400, + detail=f"Uploaded file '{file.filename}' already exists in '{KEY_INDEX_NAME}' index. Please change file name or 'index_name'.", + ) save_path = upload_folder + encode_file await save_content_to_local_disk(save_path, file) @@ -454,7 +450,7 @@ async def ingest_files( raise HTTPException(status_code=400, detail="Must provide either a file or a string list.") - async def get_files(self): + async def get_files(self, key_index_name=KEY_INDEX_NAME): """Get file structure from redis database in the format of { "name": "File Name", @@ -462,7 +458,9 @@ async def get_files(self): "type": "File", "parent": "", }""" - + + if key_index_name is None: + key_index_name = KEY_INDEX_NAME if logflag: logger.info("[ redis get ] start to get file structure") @@ -470,20 +468,20 @@ async def get_files(self): file_list = [] # check index existence - res = check_index_existance(self.key_index_client) + res = key_index_name in self.get_list_of_indices() if not res: if logflag: - logger.info(f"[ redis get ] index {KEY_INDEX_NAME} does not exist") + logger.info(f"[ redis get ] index {key_index_name} does not exist") return file_list while True: response = self.client.execute_command( - "FT.SEARCH", KEY_INDEX_NAME, "*", "LIMIT", offset, offset + SEARCH_BATCH_SIZE + "FT.SEARCH", key_index_name, "*", "LIMIT", offset, offset + SEARCH_BATCH_SIZE ) # no doc retrieved if len(response) < 2: break - file_list = format_search_results(response, file_list) + file_list = format_search_results(response, key_index_name, file_list) offset += SEARCH_BATCH_SIZE # last batch if (len(response) - 1) // 2 < SEARCH_BATCH_SIZE: @@ -624,27 +622,4 @@ def get_list_of_indices(self): # Decode each index name from bytes to string indices_list = [item.decode('utf-8') for item in indices] return indices_list - - def get_items_of_index(self, index_name=INDEX_NAME): # , redis_client=redis_client): - """ - Retrieves items from a specific index in Redis. - - Args: - index_name: The name of the index to search. - redis_client: The Redis client instance to use for executing commands. - - Returns: - A sorted list of items from the specified index. - """ - # Execute the command to search for all items in the specified index - results = self.client.execute_command(f'FT.SEARCH {index_name} {"*"} LIMIT 0 100') - list_of_items = [] - # Iterate through the results - for r in results: - if isinstance(r, list): - # Extract and decode the item where 'source_video' is found in the value - list_of_items.append( - [r[i+1].decode('utf-8') for i, v in enumerate(r) if 'source_video' in str(v)][0] - ) - # Return the sorted list of items - return sorted(list_of_items) \ No newline at end of file + \ No newline at end of file diff --git a/comps/dataprep/src/integrations/redis_multimodal.py b/comps/dataprep/src/integrations/redis_multimodal.py index 47719282b6..916968b349 100644 --- a/comps/dataprep/src/integrations/redis_multimodal.py +++ b/comps/dataprep/src/integrations/redis_multimodal.py @@ -334,6 +334,10 @@ class OpeaMultimodalRedisDataprep(OpeaComponent): def __init__(self, name: str, description: str, config: dict = None): super().__init__(name, ServiceType.DATAPREP.name.lower(), description, config) + + print(">>>>>>>>>>>>>>>>>>>> OpeaMultimodalRedisDataprep - __init__ name:", name) + print(">>>>>>>>>>>>>>>>>>>> OpeaMultimodalRedisDataprep - __init__ description:", description) + self.device = "cpu" self.upload_folder = "./uploaded_files/" # Load embeddings model @@ -459,6 +463,8 @@ def ingest_multimodal(self, filename, data_folder, embeddings, is_pdf=False): path_to_frames = os.path.join(data_folder, "frames") annotation = load_json_file(annotation_file_path) + + print(">>>>>>>>>>>>>>>>>>>> ingest_multimodal - annotation:", annotation) # prepare data to ingest if is_pdf: @@ -472,6 +478,10 @@ def ingest_multimodal(self, filename, data_folder, embeddings, is_pdf=False): INDEX_NAME = os.getenv("INDEX_NAME", "mm-rag-redis") + print(">>>>>>>>>>>>>>>>>>>> ingest_multimodal - filename:", filename) + print(">>>>>>>>>>>>>>>>>>>> ingest_multimodal - text_list:", len(text_list)) + print() + MultimodalRedis.from_text_image_pairs_return_keys( texts=[f"From {filename}. " + text for text in text_list], images=image_list, @@ -496,6 +506,7 @@ def get_list_of_indices(self, redis_client=redis_client): indices = redis_client.execute_command('FT._LIST') # Decode each index name from bytes to string and strip any surrounding single quotes indices_list = [item.decode('utf-8').strip("'") for item in indices] + print(">>>>>>>>>>>>>>>>>>>> redis mm - get_list_of_indices ", indices_list) return indices_list def get_items_of_index(self, index_name=INDEX_NAME, redis_client=redis_client): diff --git a/comps/dataprep/src/opea_dataprep_loader.py b/comps/dataprep/src/opea_dataprep_loader.py index 694047bb81..41e94a06ab 100644 --- a/comps/dataprep/src/opea_dataprep_loader.py +++ b/comps/dataprep/src/opea_dataprep_loader.py @@ -36,12 +36,7 @@ async def get_list_of_indices(self, *args, **kwargs): if logflag: logger.info("[ dataprep loader ] get indices") return self.component.get_list_of_indices(*args, **kwargs) - - async def get_items_of_index(self, *args, **kwargs): - if logflag: - logger.info("[ dataprep loader ] get items of the index") - return self.component.get_items_of_index(*args, **kwargs) - + class OpeaDataprepMultiModalLoader(OpeaComponentLoader): def __init__(self, component_name, **kwargs): diff --git a/comps/dataprep/src/opea_dataprep_microservice.py b/comps/dataprep/src/opea_dataprep_microservice.py index 7dda2879d4..1c5b8b3aca 100644 --- a/comps/dataprep/src/opea_dataprep_microservice.py +++ b/comps/dataprep/src/opea_dataprep_microservice.py @@ -7,16 +7,16 @@ from typing import List, Optional, Union from fastapi import Body, File, Form, UploadFile -from integrations.elasticsearch import OpeaElasticSearchDataprep -from integrations.milvus import OpeaMilvusDataprep -from integrations.neo4j_llamaindex import OpeaNeo4jLlamaIndexDataprep -from integrations.opensearch import OpeaOpenSearchDataprep -from integrations.pgvect import OpeaPgvectorDataprep -from integrations.pipecone import OpeaPineConeDataprep -from integrations.qdrant import OpeaQdrantDataprep -from integrations.redis import OpeaRedisDataprep -from integrations.vdms import OpeaVdmsDataprep -from opea_dataprep_loader import OpeaDataprepLoader +from comps.dataprep.src.integrations.elasticsearch import OpeaElasticSearchDataprep +from comps.dataprep.src.integrations.milvus import OpeaMilvusDataprep +from comps.dataprep.src.integrations.neo4j_llamaindex import OpeaNeo4jLlamaIndexDataprep +from comps.dataprep.src.integrations.opensearch import OpeaOpenSearchDataprep +from comps.dataprep.src.integrations.pgvect import OpeaPgvectorDataprep +from comps.dataprep.src.integrations.pipecone import OpeaPineConeDataprep +from comps.dataprep.src.integrations.qdrant import OpeaQdrantDataprep +from comps.dataprep.src.integrations.redis import OpeaRedisDataprep +from comps.dataprep.src.integrations.vdms import OpeaVdmsDataprep +from comps.dataprep.src.opea_dataprep_loader import OpeaDataprepLoader from comps import ( CustomLogger, @@ -55,8 +55,13 @@ async def ingest_files( chunk_overlap: int = Form(100), process_table: bool = Form(False), table_strategy: str = Form("fast"), + key_index_name: Optional[str] = Form(None), ): start = time.time() + + if key_index_name: + # Set key_input_name to environment variable + os.environ['KEY_INDEX_NAME'] = key_index_name if logflag: logger.info(f"[ ingest ] files:{files}") @@ -84,7 +89,7 @@ async def ingest_files( port=5000, ) @register_statistics(names=["opea_service@dataprep"]) -async def get_files(): +async def get_files(key_index_name: Optional[str] = File(None)): start = time.time() if logflag: @@ -92,7 +97,7 @@ async def get_files(): try: # Use the loader to invoke the component - response = await loader.get_files() + response = await loader.get_files(key_index_name) # Log the result if logging is enabled if logflag: logger.info(f"[ get ] ingested files: {response}") @@ -131,6 +136,34 @@ async def delete_files(file_path: str = Body(..., embed=True)): logger.error(f"Error during dataprep delete invocation: {e}") raise +@register_microservice( + name="opea_service@dataprep", + service_type=ServiceType.DATAPREP, + endpoint="/v1/dataprep/indices", + host="0.0.0.0", + port=5000, +) +@register_statistics(names=["opea_service@dataprep"]) +async def get_list_of_indices(): + start = time.time() + if logflag: + logger.info("[ get ] start to get list of indices.") + + try: + # Use the loader to invoke the component + response = await loader.get_list_of_indices() + + # Log the result if logging is enabled + if logflag: + logger.info(f"[ get ] list of indices: {response}") + + # Record statistics + statistics_dict["opea_service@dataprep"].append_latency(time.time() - start, None) + + return response + except Exception as e: + logger.error(f"Error during dataprep get list of indices: {e}") + raise if __name__ == "__main__": logger.info("OPEA Dataprep Microservice is starting...") diff --git a/comps/dataprep/src/requirements.txt b/comps/dataprep/src/requirements.txt index ba12387772..b2c7f02fbb 100644 --- a/comps/dataprep/src/requirements.txt +++ b/comps/dataprep/src/requirements.txt @@ -1,8 +1,7 @@ beautifulsoup4 cairosvg decord -docarray -# [full] +docarray[full] docx2txt easyocr einops @@ -41,7 +40,7 @@ pgvector==0.2.5 Pillow pinecone-client prometheus-fastapi-instrumentator -#psycopg2 +psycopg2 pymupdf pyspark pytesseract diff --git a/comps/dataprep/src/utils.py b/comps/dataprep/src/utils.py index f657fd2cee..c18ff5ad47 100644 --- a/comps/dataprep/src/utils.py +++ b/comps/dataprep/src/utils.py @@ -800,14 +800,14 @@ def get_file_structure(root_path: str, parent_path: str = "") -> List[Dict[str, return result -def format_search_results(response, file_list: list): +def format_search_results(response, key_index_name, file_list: list): for i in range(1, len(response), 2): - file_name = response[i].decode()[5:] + file_name = response[i].decode()[4:] file_dict = { "name": decode_filename(file_name), "id": decode_filename(file_name), "type": "File", - "parent": "", + "parent": key_index_name, } file_list.append(file_dict) return file_list diff --git a/wip_opea_dataprep_multimodal_microservice.py b/wip_opea_dataprep_multimodal_microservice.py deleted file mode 100644 index f773ccc1aa..0000000000 --- a/wip_opea_dataprep_multimodal_microservice.py +++ /dev/null @@ -1,334 +0,0 @@ -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - - -import os -import time -from typing import List, Optional, Union - -from fastapi import Body, File, UploadFile -from comps.dataprep.src.integrations.redis_multimodal import OpeaMultimodalRedisDataprep -from comps.dataprep.src.integrations.vdms_multimodal import OpeaMultimodalVdmsDataprep -from comps.dataprep.src.opea_dataprep_loader import OpeaDataprepMultiModalLoader - -from comps import ( - CustomLogger, - ServiceType, - opea_microservices, - register_microservice, - register_statistics, - statistics_dict, -) -from comps.dataprep.src.utils import create_upload_folder - -logger = CustomLogger("opea_dataprep_multimodal_microservice") -logflag = os.getenv("LOGFLAG", False) -upload_folder = "./uploaded_files/" - -dataprep_component_name = os.getenv("DATAPREP_COMPONENT_NAME", "OPEA_DATAPREP_MULTIMODALVDMS") -# Initialize OpeaComponentLoader -loader = OpeaDataprepMultiModalLoader( - dataprep_component_name, - description=f"OPEA DATAPREP Multimodal Component: {dataprep_component_name}", -) - - -@register_microservice( - name="opea_service@dataprep_multimodal", - service_type=ServiceType.DATAPREP, - endpoint="/v1/dataprep/ingest", - host="0.0.0.0", - port=5000, -) -@register_statistics(names=["opea_service@dataprep_multimodal"]) -async def ingest_files(files: Optional[Union[UploadFile, List[UploadFile]]] = File(None), - index_name: Optional[str] = File(None) - ): - start = time.time() - - if index_name: - # Set an environment variable - os.environ['INDEX_NAME'] = index_name - - if logflag: - logger.info(f"[ ingest ] files:{files}") - - try: - # Use the loader to invoke the component - response = await loader.ingest_files(files) - # Log the result if logging is enabled - if logflag: - logger.info(f"[ ingest ] Output generated: {response}") - # Record statistics - statistics_dict["opea_service@dataprep_multimodal"].append_latency(time.time() - start, None) - return response - except Exception as e: - logger.error(f"Error during dataprep ingest files invocation: {e}") - raise - - -@register_microservice( - name="opea_service@dataprep_multimodal", - service_type=ServiceType.DATAPREP, - endpoint="/v1/dataprep/ingest_videos", - host="0.0.0.0", - port=5000, -) -@register_statistics(names=["opea_service@dataprep_multimodal"]) -async def ingest_videos(files: Optional[Union[UploadFile, List[UploadFile]]] = File(None)): - start = time.time() - - if logflag: - logger.info(f"[ ingest ] files:{files}") - - try: - # Use the loader to invoke the component - response = await loader.ingest_videos(files) - # Log the result if logging is enabled - if logflag: - logger.info(f"[ ingest ] Output generated: {response}") - # Record statistics - statistics_dict["opea_service@dataprep_multimodal"].append_latency(time.time() - start, None) - return response - except Exception as e: - logger.error(f"Error during dataprep ingest videos invocation: {e}") - raise - - -@register_microservice( - name="opea_service@dataprep_multimodal", - service_type=ServiceType.DATAPREP, - endpoint="/v1/dataprep/generate_transcripts", - host="0.0.0.0", - port=5000, -) -@register_statistics(names=["opea_service@dataprep_multimodal"]) -async def ingest_generate_transcripts(files: Optional[Union[UploadFile, List[UploadFile]]] = File(None)): - start = time.time() - - if logflag: - logger.info(f"[ ingest ] files:{files}") - try: - # Use the loader to invoke the component - response = await loader.ingest_generate_transcripts(files) - # Log the result if logging is enabled - if logflag: - logger.info(f"[ ingest ] Output generated: {response}") - # Record statistics - statistics_dict["opea_service@dataprep_multimodal"].append_latency(time.time() - start, None) - return response - except Exception as e: - logger.error(f"Error during dataprep generate_transcripts invocation: {e}") - raise - - -@register_microservice( - name="opea_service@dataprep_multimodal", - service_type=ServiceType.DATAPREP, - endpoint="/v1/dataprep/generate_captions", - host="0.0.0.0", - port=5000, -) -@register_statistics(names=["opea_service@dataprep_multimodal"]) -async def ingest_generate_captions(files: Optional[Union[UploadFile, List[UploadFile]]] = File(None)): - start = time.time() - - if logflag: - logger.info(f"[ ingest ] files:{files}") - - try: - # Use the loader to invoke the component - response = await loader.ingest_generate_captions(files) - # Log the result if logging is enabled - if logflag: - logger.info(f"[ ingest ] Output generated: {response}") - # Record statistics - statistics_dict["opea_service@dataprep_multimodal"].append_latency(time.time() - start, None) - return response - except Exception as e: - logger.error(f"Error during dataprep generate_captions invocation: {e}") - raise - - -@register_microservice( - name="opea_service@dataprep_multimodal", - service_type=ServiceType.DATAPREP, - endpoint="/v1/dataprep/get", - host="0.0.0.0", - port=5000, -) -@register_statistics(names=["opea_service@dataprep_multimodal"]) -async def get_files(): - start = time.time() - - # logger.info(f"Processing video input at {_endpoint}/v1/video2audio") - - if logflag: - logger.info("[ get ] start to get ingested files") - - try: - # Use the loader to invoke the component - response = await loader.get_files() - # Log the result if logging is enabled - if logflag: - logger.info(f"[ get ] ingested files: {response}") - # Record statistics - statistics_dict["opea_service@dataprep_multimodal"].append_latency(time.time() - start, None) - return response - except Exception as e: - logger.error(f"Error during dataprep get files invocation: {e}") - raise - - -@register_microservice( - name="opea_service@dataprep_multimodal", - service_type=ServiceType.DATAPREP, - endpoint="/v1/dataprep/get/{filename}", - host="0.0.0.0", - port=5000, - methods=["GET"], -) -@register_statistics(names=["opea_service@dataprep_multimodal"]) -async def get_one_file(filename: str): - start = time.time() - - if logflag: - logger.info("[ get ] start to get ingested files") - - try: - # Use the loader to invoke the component - response = await loader.get_one_file(filename) - # Log the result if logging is enabled - if logflag: - logger.info(f"[ get ] ingested files: {response}") - # Record statistics - statistics_dict["opea_service@dataprep_multimodal"].append_latency(time.time() - start, None) - return response - except Exception as e: - logger.error(f"Error during dataprep get one file invocation: {e}") - raise - - -@register_microservice( - name="opea_service@dataprep_multimodal", - service_type=ServiceType.DATAPREP, - endpoint="/v1/dataprep/get_videos", - host="0.0.0.0", - port=5000, - methods=["GET"], -) -@register_statistics(names=["opea_service@dataprep_multimodal"]) -async def get_videos(): - start = time.time() - - if logflag: - logger.info("[ get ] start to get ingested files") - - try: - # Use the loader to invoke the component - response = await loader.get_videos() - # Log the result if logging is enabled - if logflag: - logger.info(f"[ get ] ingested files: {response}") - # Record statistics - statistics_dict["opea_service@dataprep_multimodal"].append_latency(time.time() - start, None) - return response - except Exception as e: - logger.error(f"Error during dataprep get videos invocation: {e}") - raise - - -@register_microservice( - name="opea_service@dataprep_multimodal", - service_type=ServiceType.DATAPREP, - endpoint="/v1/dataprep/delete", - host="0.0.0.0", - port=5000, -) -@register_statistics(names=["opea_service@dataprep_multimodal"]) -async def delete_files(file_path: str = Body(..., embed=True)): - start = time.time() - - if logflag: - logger.info("[ delete ] start to delete ingested files") - - try: - # Use the loader to invoke the component - response = await loader.delete_files(file_path) - # Log the result if logging is enabled - if logflag: - logger.info(f"[ delete ] deleted result: {response}") - # Record statistics - statistics_dict["opea_service@dataprep_multimodal"].append_latency(time.time() - start, None) - return response - except Exception as e: - logger.error(f"Error during dataprep delete invocation: {e}") - raise - -@register_microservice( - name="opea_service@dataprep_multimodal", - service_type=ServiceType.DATAPREP, - endpoint="/v1/dataprep/indices", - host="0.0.0.0", - port=5000, -) -@register_statistics(names=["opea_service@dataprep_multimodal"]) -async def get_list_of_indices(): - start = time.time() - - if logflag: - logger.info("[ get ] start to get list of indices.") - - try: - # Use the loader to invoke the component - response = await loader.get_list_of_indices() - - # Log the result if logging is enabled - if logflag: - logger.info(f"[ get ] list of indices: {response}") - - # Record statistics - statistics_dict["opea_service@dataprep_multimodal"].append_latency(time.time() - start, None) - - return response - except Exception as e: - logger.error(f"Error during dataprep get list of indices: {e}") - raise - -@register_microservice( - name="opea_service@dataprep_multimodal", - service_type=ServiceType.DATAPREP, - endpoint="/v1/dataprep/items_of_index", - host="0.0.0.0", - port=5000, -) -@register_statistics(names=["opea_service@dataprep_multimodal"]) -async def get_items_of_index(index_name: Optional[str] = File(None) ): - start = time.time() - - #TODO : make sure index is already in the db - - if logflag: - logger.info(f"[ get ] start to get items of index:{index_name}.") - - try: - # Use the loader to invoke the component - response = await loader.get_items_of_index(index_name) - - # Log the result if logging is enabled - if logflag: - logger.info(f"[ get ] items of index: {response}") - - # Record statistics - statistics_dict["opea_service@dataprep_multimodal"].append_latency(time.time() - start, None) - - return response - except Exception as e: - logger.error(f"Error during dataprep get list of indexes: {e}") - raise - - -if __name__ == "__main__": - logger.info("OPEA Dataprep Multimodal Microservice is starting...") - create_upload_folder(upload_folder) - opea_microservices["opea_service@dataprep_multimodal"].start() From 80f3a58840bd6eb4a8b27173cefee221493add60 Mon Sep 17 00:00:00 2001 From: Mustafa Date: Mon, 3 Mar 2025 16:39:30 -0800 Subject: [PATCH 5/5] update data type for key index Signed-off-by: Mustafa --- comps/cores/proto/api_protocol.py | 3 +++ comps/cores/proto/docarray.py | 1 + tests/retrievers/test_retrievers_redis.sh | 12 +++++++----- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/comps/cores/proto/api_protocol.py b/comps/cores/proto/api_protocol.py index f8fec8d3a9..1200183740 100644 --- a/comps/cores/proto/api_protocol.py +++ b/comps/cores/proto/api_protocol.py @@ -267,6 +267,9 @@ class ChatCompletionRequest(BaseModel): # define request_type: Literal["chat"] = "chat" + + # key index name + key_index_name: Optional[str] = None class DocSumChatCompletionRequest(ChatCompletionRequest): diff --git a/comps/cores/proto/docarray.py b/comps/cores/proto/docarray.py index 55d840350b..6071f8282a 100644 --- a/comps/cores/proto/docarray.py +++ b/comps/cores/proto/docarray.py @@ -225,6 +225,7 @@ class LLMParams(BaseDoc): repetition_penalty: float = 1.03 stream: bool = True language: str = "auto" # can be "en", "zh" + key_index_name: Optional[str] = None chat_template: Optional[str] = Field( default=None, diff --git a/tests/retrievers/test_retrievers_redis.sh b/tests/retrievers/test_retrievers_redis.sh index aa2bbe61fc..a962a2835c 100644 --- a/tests/retrievers/test_retrievers_redis.sh +++ b/tests/retrievers/test_retrievers_redis.sh @@ -18,7 +18,9 @@ service_name_mm="retriever-redis-multimodal" function build_docker_images() { cd $WORKPATH - docker build --no-cache -t ${REGISTRY:-opea}/retriever:${TAG:-latest} --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/retrievers/src/Dockerfile . + # docker build --no-cache -t ${REGISTRY:-opea}/retriever:${TAG:-latest} --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/retrievers/src/Dockerfile . + docker build -t ${REGISTRY:-opea}/retriever:${TAG:-latest} --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/retrievers/src/Dockerfile . + if [ $? -ne 0 ]; then echo "opea/retriever built fail" exit 1 @@ -137,7 +139,7 @@ function stop_docker() { function main() { - stop_docker + # stop_docker build_docker_images # test text retriever @@ -152,9 +154,9 @@ function main() { validate_microservice "$test_embedding_multi" "$service_name_mm" validate_mm_microservice "$test_embedding_multi" "$service_name_mm" - # clean env - stop_docker - echo y | docker system prune + # # clean env + # stop_docker + # echo y | docker system prune }