From 98b22d08d6db03a814f19914946c453f85815f2d Mon Sep 17 00:00:00 2001 From: Mustafa Date: Mon, 14 Apr 2025 18:16:52 -0700 Subject: [PATCH 1/4] remove index_names from files fot get request Signed-off-by: Mustafa --- comps/dataprep/src/integrations/redis.py | 151 +++++++++++------- .../src/opea_dataprep_microservice.py | 18 ++- comps/dataprep/src/utils.py | 3 +- 3 files changed, 105 insertions(+), 67 deletions(-) diff --git a/comps/dataprep/src/integrations/redis.py b/comps/dataprep/src/integrations/redis.py index 07485ebcef..413ecb7d82 100644 --- a/comps/dataprep/src/integrations/redis.py +++ b/comps/dataprep/src/integrations/redis.py @@ -122,7 +122,7 @@ async def create_index(client, index_name: str = KEY_INDEX_NAME): logger.info(f"[ create index ] creating index {index_name}") try: definition = IndexDefinition(index_type=IndexType.HASH, prefix=["file:"]) - await client.create_index((TextField("file_name"), TextField("key_ids")), definition=definition) + await client.create_index((TextField("file_name"), TextField("key_ids"), TextField("index_name")), definition=definition) if logflag: logger.info(f"[ create index ] index {index_name} successfully created") except Exception as e: @@ -132,11 +132,11 @@ async def create_index(client, index_name: str = KEY_INDEX_NAME): return True -async def store_by_id(client, key, value): +async def store_by_id(client, key, value, ingest_index_name): if logflag: logger.info(f"[ store by id ] storing ids of {key}") try: - await client.add_document(doc_id="file:" + key, file_name=key, key_ids=value) + await client.add_document(doc_id="file:" + key, file_name=key, key_ids=value, index_name=ingest_index_name) if logflag: logger.info(f"[ store by id ] store document success. id: file:{key}") except Exception as e: @@ -225,7 +225,7 @@ async def ingest_chunks_to_redis(file_name: str, chunks: List, embedder, index_n await create_index(client) try: - await store_by_id(client, key=encode_filename(ingest_index_name) + "_" + file_name, value="#".join(file_ids)) + await store_by_id(client, key=encode_filename(ingest_index_name) + "_" + file_name, value="#".join(file_ids), ingest_index_name=ingest_index_name) except Exception as e: if logflag: logger.info(f"[ redis ingest chunks ] {e}. Fail to store chunks of file {file_name}.") @@ -478,7 +478,7 @@ async def ingest_files( raise HTTPException(status_code=400, detail="Must provide either a file or a string list.") - async def get_files(self): + async def get_files(self, index_name: str = Body(None, embed=True)): """Get file structure from redis database in the format of { "name": "File Name", @@ -507,7 +507,28 @@ async def get_files(self): # no doc retrieved if len(response) < 2: break + file_list = format_search_results(response, file_list) + index_name = INDEX_NAME if index_name is None else index_name + filtered_files=[] + for file in file_list: + # Ensure "index_name" key exists in the file dictionary + if "index_name" not in file: + continue + + if index_name == "all": + # Remove the index_names from all files if it exists + file["name"] = file["name"].replace(file["index_name"] + "_", "") + file["id"] = file["id"].replace(file["index_name"] + "_", "") + filtered_files.append(file) + + elif index_name == file["index_name"]: + # Remove the index_name from the defined index of the files if it exists + file["name"] = file["name"].replace(index_name + "_", "") + file["id"] = file["id"].replace(index_name + "_", "") + filtered_files.append(file) + + file_list = filtered_files offset += SEARCH_BATCH_SIZE # last batch if (len(response) - 1) // 2 < SEARCH_BATCH_SIZE: @@ -527,7 +548,7 @@ async def delete_files(self, file_path: str = Body(..., embed=True), index_name: logger.info(f"[ redis delete ] delete files: {file_path}") # delete all uploaded files - if file_path == "all": + if file_path == "all" and index_name is None: if logflag: logger.info("[ redis delete ] delete all files") @@ -571,78 +592,86 @@ async def delete_files(self, file_path: str = Body(..., embed=True), index_name: logger.info({"status": True}) return {"status": True} - delete_path = Path(upload_folder + "/" + encode_filename(file_path)) - if logflag: - logger.info(f"[ redis delete ] delete_path: {delete_path}") - - # partially delete files - encode_file = encode_filename(file_path) + # partially delete files based on index_name index_name = INDEX_NAME if index_name is None else index_name index_name_id = encode_filename(index_name) - doc_id = "file:" + index_name_id + "_" + encode_file - logger.info(f"[ redis delete ] doc id: {doc_id}") - - # determine whether this file exists in db KEY_INDEX_NAME - try: - result = await search_by_id(self.key_index_client, doc_id) - key_ids = result.key_ids - except Exception as e: - if logflag: - logger.info(f"[ redis delete ] {e}, File {file_path} does not exists.") - raise HTTPException( - status_code=404, detail=f"File not found in db {KEY_INDEX_NAME}. Please check file_path." - ) - file_ids = key_ids.split("#") - - # delete file keys id in db KEY_INDEX_NAME - try: - res = await delete_by_id(self.key_index_client, doc_id) - assert res - except Exception as e: + + if file_path == "all" and index_name is not None: + file_list = [ i['name'] for i in await self.get_files(index_name)] + else: + file_list = [file_path] + + for file_path in file_list: + delete_path = Path(upload_folder + "/" + encode_filename(file_path)) if logflag: - logger.info(f"[ redis delete ] {e}. File {file_path} delete failed for db {KEY_INDEX_NAME}.") - raise HTTPException(status_code=500, detail=f"File {file_path} delete failed for key index.") + logger.info(f"[ redis delete ] delete_path: {delete_path}") + + encode_file = encode_filename(file_path) + doc_id = "file:" + index_name_id + "_" + encode_file + logger.info(f"[ redis delete ] doc id: {doc_id}") - # delete file content in db index_name - for file_id in file_ids: - # determine whether this file exists in db index_name + # determine whether this file exists in db KEY_INDEX_NAME try: - await search_by_id(self.data_index_client, file_id) + result = await search_by_id(self.key_index_client, doc_id) + key_ids = result.key_ids except Exception as e: if logflag: - logger.info(f"[ redis delete ] {e}. File {file_path} does not exists.") + logger.info(f"[ redis delete ] {e}, File {file_path} does not exists.") raise HTTPException( - status_code=404, detail=f"File not found in db {index_name}. Please check file_path." + status_code=404, detail=f"File not found in db {KEY_INDEX_NAME}. Please check file_path." ) + file_ids = key_ids.split("#") - # delete file content + # delete file keys id in db KEY_INDEX_NAME try: - res = await delete_by_id(self.data_index_client, file_id) + res = await delete_by_id(self.key_index_client, doc_id) assert res except Exception as e: if logflag: - logger.info(f"[ redis delete ] {e}. File {file_path} delete failed for db {index_name}") - raise HTTPException(status_code=500, detail=f"File {file_path} delete failed for index.") + logger.info(f"[ redis delete ] {e}. File {file_path} delete failed for db {KEY_INDEX_NAME}.") + raise HTTPException(status_code=500, detail=f"File {file_path} delete failed for key index.") - # local file does not exist (restarted docker container) - if not delete_path.exists(): - if logflag: - logger.info(f"[ redis delete ] File {file_path} not saved locally.") - return {"status": True} + # delete file content in db index_name + for file_id in file_ids: + # determine whether this file exists in db index_name + try: + await search_by_id(self.data_index_client, file_id) + except Exception as e: + if logflag: + logger.info(f"[ redis delete ] {e}. File {file_path} does not exists.") + raise HTTPException( + status_code=404, detail=f"File not found in db {index_name}. Please check file_path." + ) - # delete local file - if delete_path.is_file(): - # delete file on local disk - delete_path.unlink() - if logflag: - logger.info(f"[ redis delete ] File {file_path} deleted successfully.") - return {"status": True} + # delete file content + try: + res = await delete_by_id(self.data_index_client, file_id) + assert res + except Exception as e: + if logflag: + logger.info(f"[ redis delete ] {e}. File {file_path} delete failed for db {index_name}") + raise HTTPException(status_code=500, detail=f"File {file_path} delete failed for index.") - # delete folder - else: - if logflag: - logger.info(f"[ redis delete ] Delete folder {file_path} is not supported for now.") - raise HTTPException(status_code=404, detail=f"Delete folder {file_path} is not supported for now.") + # local file does not exist (restarted docker container) + if not delete_path.exists(): + if logflag: + logger.info(f"[ redis delete ] File {file_path} not saved locally.") + return {"status": True} + + # delete local file + if delete_path.is_file(): + # delete file on local disk + delete_path.unlink() + if logflag: + logger.info(f"[ redis delete ] File {file_path} deleted successfully.") + + # delete folder + else: + if logflag: + logger.info(f"[ redis delete ] Delete folder {file_path} is not supported for now.") + raise HTTPException(status_code=404, detail=f"Delete folder {file_path} is not supported for now.") + + return {"status": True} def get_list_of_indices(self): """Retrieves a list of all indices from the Redis client. diff --git a/comps/dataprep/src/opea_dataprep_microservice.py b/comps/dataprep/src/opea_dataprep_microservice.py index ac5c5443ad..849f426923 100644 --- a/comps/dataprep/src/opea_dataprep_microservice.py +++ b/comps/dataprep/src/opea_dataprep_microservice.py @@ -108,15 +108,24 @@ async def ingest_files( port=5000, ) @register_statistics(names=["opea_service@dataprep"]) -async def get_files(): +async def get_files(index_name: str = Body(None, embed=True) ): start = time.time() if logflag: logger.info("[ get ] start to get ingested files") - try: + try: # Use the loader to invoke the component - response = await loader.get_files() + if dataprep_component_name == "OPEA_DATAPREP_REDIS": + response = await loader.get_files(index_name) + else: + if index_name: + logger.error( + 'Error during dataprep get files: "index_name" option is supported if "DATAPREP_COMPONENT_NAME" environment variable is set to "OPEA_DATAPREP_REDIS". i.e: export DATAPREP_COMPONENT_NAME="OPEA_DATAPREP_REDIS"' + ) + raise + response = await loader.get_files() + # Log the result if logging is enabled if logflag: logger.info(f"[ get ] ingested files: {response}") @@ -152,7 +161,6 @@ async def delete_files(file_path: str = Body(..., embed=True), index_name: str = 'Error during dataprep delete files: "index_name" option is supported if "DATAPREP_COMPONENT_NAME" environment variable is set to "OPEA_DATAPREP_REDIS". i.e: export DATAPREP_COMPONENT_NAME="OPEA_DATAPREP_REDIS"' ) raise - # Use the loader to invoke the component response = await loader.delete_files(file_path) # Log the result if logging is enabled @@ -205,4 +213,4 @@ async def get_list_of_indices(): if __name__ == "__main__": logger.info("OPEA Dataprep Microservice is starting...") create_upload_folder(upload_folder) - opea_microservices["opea_service@dataprep"].start() + opea_microservices["opea_service@dataprep"].start() \ No newline at end of file diff --git a/comps/dataprep/src/utils.py b/comps/dataprep/src/utils.py index ec6954eb93..e8adab6366 100644 --- a/comps/dataprep/src/utils.py +++ b/comps/dataprep/src/utils.py @@ -850,16 +850,17 @@ def get_file_structure(root_path: str, parent_path: str = "") -> List[Dict[str, def format_search_results(response, file_list: list): for i in range(1, len(response), 2): file_name = response[i].decode()[5:] + index_name_index = response[i+1].index(b'index_name') file_dict = { "name": decode_filename(file_name), "id": decode_filename(file_name), "type": "File", "parent": "", + "index_name": response[i+1][index_name_index+1].decode(), } file_list.append(file_dict) return file_list - def format_file_list(file_list: list): res_list = [] for file_name in file_list: From e21ef10af3c3a6f0dd443b690ddf40d46a976966 Mon Sep 17 00:00:00 2001 From: Mustafa Date: Tue, 15 Apr 2025 14:13:26 -0700 Subject: [PATCH 2/4] update the tests Signed-off-by: Mustafa --- comps/dataprep/src/integrations/redis.py | 16 +++++------- comps/dataprep/src/utils.py | 1 + tests/dataprep/dataprep_utils.sh | 33 ++++++++++++++++++++++++ tests/dataprep/test_dataprep_redis.sh | 17 ++++++++++++ 4 files changed, 57 insertions(+), 10 deletions(-) diff --git a/comps/dataprep/src/integrations/redis.py b/comps/dataprep/src/integrations/redis.py index 413ecb7d82..d42f180a3c 100644 --- a/comps/dataprep/src/integrations/redis.py +++ b/comps/dataprep/src/integrations/redis.py @@ -516,16 +516,12 @@ async def get_files(self, index_name: str = Body(None, embed=True)): if "index_name" not in file: continue - if index_name == "all": - # Remove the index_names from all files if it exists - file["name"] = file["name"].replace(file["index_name"] + "_", "") - file["id"] = file["id"].replace(file["index_name"] + "_", "") - filtered_files.append(file) - - elif index_name == file["index_name"]: - # Remove the index_name from the defined index of the files if it exists - file["name"] = file["name"].replace(index_name + "_", "") - file["id"] = file["id"].replace(index_name + "_", "") + # Check if the file should be included based on the index_name + if index_name == "all" or index_name == file["index_name"]: + # Remove the index_name prefix from "name" and "id" + prefix = f"{file['index_name']}_" + file["name"] = file["name"].replace(prefix, "", 1) + file["id"] = file["id"].replace(prefix, "", 1) filtered_files.append(file) file_list = filtered_files diff --git a/comps/dataprep/src/utils.py b/comps/dataprep/src/utils.py index e8adab6366..956731783e 100644 --- a/comps/dataprep/src/utils.py +++ b/comps/dataprep/src/utils.py @@ -861,6 +861,7 @@ def format_search_results(response, file_list: list): file_list.append(file_dict) return file_list + def format_file_list(file_list: list): res_list = [] for file_name in file_list: diff --git a/tests/dataprep/dataprep_utils.sh b/tests/dataprep/dataprep_utils.sh index 083fbdd7b8..1cf1ec4d3b 100644 --- a/tests/dataprep/dataprep_utils.sh +++ b/tests/dataprep/dataprep_utils.sh @@ -103,6 +103,23 @@ function delete_all() { _invoke_curl $fqdn $port delete -d '{"file_path":"all"}' $@ } +function delete_all_in_index() { + local fqdn=$1 + local port=$2 + local index_name=$3 + shift 3 + _invoke_curl $fqdn $port delete -d '{"file_path":"all","index_name":"'${index_name}'"}' $@ +} + +function delete_item_in_index() { + local fqdn=$1 + local port=$2 + local index_name=$3 + local item=$4 + shift 4 + _invoke_curl $fqdn $port delete -d '{"file_path":"'${item}'","index_name":"'${index_name}'"}' $@ +} + function delete_single() { local fqdn=$1 local port=$2 @@ -117,6 +134,22 @@ function get_all() { _invoke_curl $fqdn $port get $@ } +# function delete_all_in_index() { +# local fqdn=$1 +# local port=$2 +# local index_name=$3 +# shift 3 +# _invoke_curl $fqdn $port get -d '{"index_name":"all"}' $@ +# } + +function get_index() { + local fqdn=$1 + local port=$2 + local index_name=$3 + shift 3 + _invoke_curl $fqdn $port get -d '{"index_name":"'${index_name}'"}' $@ +} + function ingest_txt_with_index_name() { local fqdn=$1 local port=$2 diff --git a/tests/dataprep/test_dataprep_redis.sh b/tests/dataprep/test_dataprep_redis.sh index d5c5a9492f..cd8f2cf7dd 100644 --- a/tests/dataprep/test_dataprep_redis.sh +++ b/tests/dataprep/test_dataprep_redis.sh @@ -87,6 +87,23 @@ function validate_microservice() { # test /v1/dataprep/get get_all ${ip_address} ${DATAPREP_PORT} check_result "dataprep - get" '{"name":' dataprep-redis-server ${LOG_PATH}/dataprep_file.log + + # test /v1/dataprep/get + get_all_index ${ip_address} ${DATAPREP_PORT} + check_result "dataprep - get" '"index_name":"rag_redis"' dataprep-redis-server ${LOG_PATH}/dataprep_file.log + + # test /v1/dataprep/get + get_index ${ip_address} ${DATAPREP_PORT} rag_redis_test + check_result "dataprep - get" '"index_name":"rag_redis_test"' dataprep-redis-server ${LOG_PATH}/dataprep_file.log + + # test /v1/dataprep/delete + delete_all_in_index ${ip_address} ${DATAPREP_PORT} rag_redis_test + check_result "dataprep - del" '{"status":true}' dataprep-redis-server ${LOG_PATH}/dataprep_del.log + + # test /v1/dataprep/delete + delete_item_in_index ${ip_address} ${DATAPREP_PORT} rag_redis ingest_dataprep.docx + check_result "dataprep - del" '{"status":true}' dataprep-redis-server ${LOG_PATH}/dataprep_del.log + } function stop_docker() { From a9a5bd5e7e4d846149854f196030e038b04c39cf Mon Sep 17 00:00:00 2001 From: Mustafa Date: Tue, 15 Apr 2025 14:16:37 -0700 Subject: [PATCH 3/4] update the tests Signed-off-by: Mustafa --- tests/dataprep/dataprep_utils.sh | 8 -------- 1 file changed, 8 deletions(-) diff --git a/tests/dataprep/dataprep_utils.sh b/tests/dataprep/dataprep_utils.sh index 1cf1ec4d3b..146234b659 100644 --- a/tests/dataprep/dataprep_utils.sh +++ b/tests/dataprep/dataprep_utils.sh @@ -134,14 +134,6 @@ function get_all() { _invoke_curl $fqdn $port get $@ } -# function delete_all_in_index() { -# local fqdn=$1 -# local port=$2 -# local index_name=$3 -# shift 3 -# _invoke_curl $fqdn $port get -d '{"index_name":"all"}' $@ -# } - function get_index() { local fqdn=$1 local port=$2 From 39d01c53f17b1c1e5877750ed187121d70855dd5 Mon Sep 17 00:00:00 2001 From: Mustafa Date: Tue, 15 Apr 2025 14:29:15 -0700 Subject: [PATCH 4/4] update the tests Signed-off-by: Mustafa --- tests/dataprep/dataprep_utils.sh | 7 +++++++ tests/dataprep/test_dataprep_redis.sh | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/tests/dataprep/dataprep_utils.sh b/tests/dataprep/dataprep_utils.sh index 146234b659..ec5e2a7893 100644 --- a/tests/dataprep/dataprep_utils.sh +++ b/tests/dataprep/dataprep_utils.sh @@ -134,6 +134,13 @@ function get_all() { _invoke_curl $fqdn $port get $@ } +function get_all_in_index() { + local fqdn=$1 + local port=$2 + shift 2 + _invoke_curl $fqdn $port get -d '{"index_name":"all"}' $@ +} + function get_index() { local fqdn=$1 local port=$2 diff --git a/tests/dataprep/test_dataprep_redis.sh b/tests/dataprep/test_dataprep_redis.sh index cd8f2cf7dd..48bfeb9724 100644 --- a/tests/dataprep/test_dataprep_redis.sh +++ b/tests/dataprep/test_dataprep_redis.sh @@ -89,7 +89,7 @@ function validate_microservice() { check_result "dataprep - get" '{"name":' dataprep-redis-server ${LOG_PATH}/dataprep_file.log # test /v1/dataprep/get - get_all_index ${ip_address} ${DATAPREP_PORT} + get_all_in_index ${ip_address} ${DATAPREP_PORT} check_result "dataprep - get" '"index_name":"rag_redis"' dataprep-redis-server ${LOG_PATH}/dataprep_file.log # test /v1/dataprep/get