Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 86 additions & 61 deletions comps/dataprep/src/integrations/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ async def create_index(client, index_name: str = KEY_INDEX_NAME):
logger.info(f"[ create index ] creating index {index_name}")
try:
definition = IndexDefinition(index_type=IndexType.HASH, prefix=["file:"])
await client.create_index((TextField("file_name"), TextField("key_ids")), definition=definition)
await client.create_index((TextField("file_name"), TextField("key_ids"), TextField("index_name")), definition=definition)
if logflag:
logger.info(f"[ create index ] index {index_name} successfully created")
except Exception as e:
Expand All @@ -132,11 +132,11 @@ async def create_index(client, index_name: str = KEY_INDEX_NAME):
return True


async def store_by_id(client, key, value):
async def store_by_id(client, key, value, ingest_index_name):
if logflag:
logger.info(f"[ store by id ] storing ids of {key}")
try:
await client.add_document(doc_id="file:" + key, file_name=key, key_ids=value)
await client.add_document(doc_id="file:" + key, file_name=key, key_ids=value, index_name=ingest_index_name)
if logflag:
logger.info(f"[ store by id ] store document success. id: file:{key}")
except Exception as e:
Expand Down Expand Up @@ -225,7 +225,7 @@ async def ingest_chunks_to_redis(file_name: str, chunks: List, embedder, index_n
await create_index(client)

try:
await store_by_id(client, key=encode_filename(ingest_index_name) + "_" + file_name, value="#".join(file_ids))
await store_by_id(client, key=encode_filename(ingest_index_name) + "_" + file_name, value="#".join(file_ids), ingest_index_name=ingest_index_name)
except Exception as e:
if logflag:
logger.info(f"[ redis ingest chunks ] {e}. Fail to store chunks of file {file_name}.")
Expand Down Expand Up @@ -478,7 +478,7 @@ async def ingest_files(

raise HTTPException(status_code=400, detail="Must provide either a file or a string list.")

async def get_files(self):
async def get_files(self, index_name: str = Body(None, embed=True)):
"""Get file structure from redis database in the format of
{
"name": "File Name",
Expand Down Expand Up @@ -507,7 +507,24 @@ async def get_files(self):
# no doc retrieved
if len(response) < 2:
break

file_list = format_search_results(response, file_list)
index_name = INDEX_NAME if index_name is None else index_name
filtered_files=[]
for file in file_list:
# Ensure "index_name" key exists in the file dictionary
if "index_name" not in file:
continue

# Check if the file should be included based on the index_name
if index_name == "all" or index_name == file["index_name"]:
# Remove the index_name prefix from "name" and "id"
prefix = f"{file['index_name']}_"
file["name"] = file["name"].replace(prefix, "", 1)
file["id"] = file["id"].replace(prefix, "", 1)
filtered_files.append(file)

file_list = filtered_files
offset += SEARCH_BATCH_SIZE
# last batch
if (len(response) - 1) // 2 < SEARCH_BATCH_SIZE:
Expand All @@ -527,7 +544,7 @@ async def delete_files(self, file_path: str = Body(..., embed=True), index_name:
logger.info(f"[ redis delete ] delete files: {file_path}")

# delete all uploaded files
if file_path == "all":
if file_path == "all" and index_name is None:
if logflag:
logger.info("[ redis delete ] delete all files")

Expand Down Expand Up @@ -571,78 +588,86 @@ async def delete_files(self, file_path: str = Body(..., embed=True), index_name:
logger.info({"status": True})
return {"status": True}

delete_path = Path(upload_folder + "/" + encode_filename(file_path))
if logflag:
logger.info(f"[ redis delete ] delete_path: {delete_path}")

# partially delete files
encode_file = encode_filename(file_path)
# partially delete files based on index_name
index_name = INDEX_NAME if index_name is None else index_name
index_name_id = encode_filename(index_name)
doc_id = "file:" + index_name_id + "_" + encode_file
logger.info(f"[ redis delete ] doc id: {doc_id}")

# determine whether this file exists in db KEY_INDEX_NAME
try:
result = await search_by_id(self.key_index_client, doc_id)
key_ids = result.key_ids
except Exception as e:
if logflag:
logger.info(f"[ redis delete ] {e}, File {file_path} does not exists.")
raise HTTPException(
status_code=404, detail=f"File not found in db {KEY_INDEX_NAME}. Please check file_path."
)
file_ids = key_ids.split("#")

# delete file keys id in db KEY_INDEX_NAME
try:
res = await delete_by_id(self.key_index_client, doc_id)
assert res
except Exception as e:

if file_path == "all" and index_name is not None:
file_list = [ i['name'] for i in await self.get_files(index_name)]
else:
file_list = [file_path]

for file_path in file_list:
delete_path = Path(upload_folder + "/" + encode_filename(file_path))
if logflag:
logger.info(f"[ redis delete ] {e}. File {file_path} delete failed for db {KEY_INDEX_NAME}.")
raise HTTPException(status_code=500, detail=f"File {file_path} delete failed for key index.")
logger.info(f"[ redis delete ] delete_path: {delete_path}")

encode_file = encode_filename(file_path)
doc_id = "file:" + index_name_id + "_" + encode_file
logger.info(f"[ redis delete ] doc id: {doc_id}")

# delete file content in db index_name
for file_id in file_ids:
# determine whether this file exists in db index_name
# determine whether this file exists in db KEY_INDEX_NAME
try:
await search_by_id(self.data_index_client, file_id)
result = await search_by_id(self.key_index_client, doc_id)
key_ids = result.key_ids
except Exception as e:
if logflag:
logger.info(f"[ redis delete ] {e}. File {file_path} does not exists.")
logger.info(f"[ redis delete ] {e}, File {file_path} does not exists.")
raise HTTPException(
status_code=404, detail=f"File not found in db {index_name}. Please check file_path."
status_code=404, detail=f"File not found in db {KEY_INDEX_NAME}. Please check file_path."
)
file_ids = key_ids.split("#")

# delete file content
# delete file keys id in db KEY_INDEX_NAME
try:
res = await delete_by_id(self.data_index_client, file_id)
res = await delete_by_id(self.key_index_client, doc_id)
assert res
except Exception as e:
if logflag:
logger.info(f"[ redis delete ] {e}. File {file_path} delete failed for db {index_name}")
raise HTTPException(status_code=500, detail=f"File {file_path} delete failed for index.")
logger.info(f"[ redis delete ] {e}. File {file_path} delete failed for db {KEY_INDEX_NAME}.")
raise HTTPException(status_code=500, detail=f"File {file_path} delete failed for key index.")

# local file does not exist (restarted docker container)
if not delete_path.exists():
if logflag:
logger.info(f"[ redis delete ] File {file_path} not saved locally.")
return {"status": True}
# delete file content in db index_name
for file_id in file_ids:
# determine whether this file exists in db index_name
try:
await search_by_id(self.data_index_client, file_id)
except Exception as e:
if logflag:
logger.info(f"[ redis delete ] {e}. File {file_path} does not exists.")
raise HTTPException(
status_code=404, detail=f"File not found in db {index_name}. Please check file_path."
)

# delete local file
if delete_path.is_file():
# delete file on local disk
delete_path.unlink()
if logflag:
logger.info(f"[ redis delete ] File {file_path} deleted successfully.")
return {"status": True}
# delete file content
try:
res = await delete_by_id(self.data_index_client, file_id)
assert res
except Exception as e:
if logflag:
logger.info(f"[ redis delete ] {e}. File {file_path} delete failed for db {index_name}")
raise HTTPException(status_code=500, detail=f"File {file_path} delete failed for index.")

# delete folder
else:
if logflag:
logger.info(f"[ redis delete ] Delete folder {file_path} is not supported for now.")
raise HTTPException(status_code=404, detail=f"Delete folder {file_path} is not supported for now.")
# local file does not exist (restarted docker container)
if not delete_path.exists():
if logflag:
logger.info(f"[ redis delete ] File {file_path} not saved locally.")
return {"status": True}

# delete local file
if delete_path.is_file():
# delete file on local disk
delete_path.unlink()
if logflag:
logger.info(f"[ redis delete ] File {file_path} deleted successfully.")

# delete folder
else:
if logflag:
logger.info(f"[ redis delete ] Delete folder {file_path} is not supported for now.")
raise HTTPException(status_code=404, detail=f"Delete folder {file_path} is not supported for now.")

return {"status": True}

def get_list_of_indices(self):
"""Retrieves a list of all indices from the Redis client.
Expand Down
18 changes: 13 additions & 5 deletions comps/dataprep/src/opea_dataprep_microservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,24 @@ async def ingest_files(
port=5000,
)
@register_statistics(names=["opea_service@dataprep"])
async def get_files():
async def get_files(index_name: str = Body(None, embed=True) ):
start = time.time()

if logflag:
logger.info("[ get ] start to get ingested files")

try:
try:
# Use the loader to invoke the component
response = await loader.get_files()
if dataprep_component_name == "OPEA_DATAPREP_REDIS":
response = await loader.get_files(index_name)
else:
if index_name:
logger.error(
'Error during dataprep get files: "index_name" option is supported if "DATAPREP_COMPONENT_NAME" environment variable is set to "OPEA_DATAPREP_REDIS". i.e: export DATAPREP_COMPONENT_NAME="OPEA_DATAPREP_REDIS"'
)
raise
response = await loader.get_files()

# Log the result if logging is enabled
if logflag:
logger.info(f"[ get ] ingested files: {response}")
Expand Down Expand Up @@ -152,7 +161,6 @@ async def delete_files(file_path: str = Body(..., embed=True), index_name: str =
'Error during dataprep delete files: "index_name" option is supported if "DATAPREP_COMPONENT_NAME" environment variable is set to "OPEA_DATAPREP_REDIS". i.e: export DATAPREP_COMPONENT_NAME="OPEA_DATAPREP_REDIS"'
)
raise
# Use the loader to invoke the component
response = await loader.delete_files(file_path)

# Log the result if logging is enabled
Expand Down Expand Up @@ -205,4 +213,4 @@ async def get_list_of_indices():
if __name__ == "__main__":
logger.info("OPEA Dataprep Microservice is starting...")
create_upload_folder(upload_folder)
opea_microservices["opea_service@dataprep"].start()
opea_microservices["opea_service@dataprep"].start()
2 changes: 2 additions & 0 deletions comps/dataprep/src/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -850,11 +850,13 @@ def get_file_structure(root_path: str, parent_path: str = "") -> List[Dict[str,
def format_search_results(response, file_list: list):
for i in range(1, len(response), 2):
file_name = response[i].decode()[5:]
index_name_index = response[i+1].index(b'index_name')
file_dict = {
"name": decode_filename(file_name),
"id": decode_filename(file_name),
"type": "File",
"parent": "",
"index_name": response[i+1][index_name_index+1].decode(),
}
file_list.append(file_dict)
return file_list
Expand Down
32 changes: 32 additions & 0 deletions tests/dataprep/dataprep_utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,23 @@ function delete_all() {
_invoke_curl $fqdn $port delete -d '{"file_path":"all"}' $@
}

function delete_all_in_index() {
local fqdn=$1
local port=$2
local index_name=$3
shift 3
_invoke_curl $fqdn $port delete -d '{"file_path":"all","index_name":"'${index_name}'"}' $@
}

function delete_item_in_index() {
local fqdn=$1
local port=$2
local index_name=$3
local item=$4
shift 4
_invoke_curl $fqdn $port delete -d '{"file_path":"'${item}'","index_name":"'${index_name}'"}' $@
}

function delete_single() {
local fqdn=$1
local port=$2
Expand All @@ -117,6 +134,21 @@ function get_all() {
_invoke_curl $fqdn $port get $@
}

function get_all_in_index() {
local fqdn=$1
local port=$2
shift 2
_invoke_curl $fqdn $port get -d '{"index_name":"all"}' $@
}

function get_index() {
local fqdn=$1
local port=$2
local index_name=$3
shift 3
_invoke_curl $fqdn $port get -d '{"index_name":"'${index_name}'"}' $@
}

function ingest_txt_with_index_name() {
local fqdn=$1
local port=$2
Expand Down
17 changes: 17 additions & 0 deletions tests/dataprep/test_dataprep_redis.sh
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,23 @@ function validate_microservice() {
# test /v1/dataprep/get
get_all ${ip_address} ${DATAPREP_PORT}
check_result "dataprep - get" '{"name":' dataprep-redis-server ${LOG_PATH}/dataprep_file.log

# test /v1/dataprep/get
get_all_in_index ${ip_address} ${DATAPREP_PORT}
check_result "dataprep - get" '"index_name":"rag_redis"' dataprep-redis-server ${LOG_PATH}/dataprep_file.log

# test /v1/dataprep/get
get_index ${ip_address} ${DATAPREP_PORT} rag_redis_test
check_result "dataprep - get" '"index_name":"rag_redis_test"' dataprep-redis-server ${LOG_PATH}/dataprep_file.log

# test /v1/dataprep/delete
delete_all_in_index ${ip_address} ${DATAPREP_PORT} rag_redis_test
check_result "dataprep - del" '{"status":true}' dataprep-redis-server ${LOG_PATH}/dataprep_del.log

# test /v1/dataprep/delete
delete_item_in_index ${ip_address} ${DATAPREP_PORT} rag_redis ingest_dataprep.docx
check_result "dataprep - del" '{"status":true}' dataprep-redis-server ${LOG_PATH}/dataprep_del.log

}

function stop_docker() {
Expand Down