Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions comps/cores/proto/api_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,9 @@ class ChatCompletionRequest(BaseModel):

# define
request_type: Literal["chat"] = "chat"

# key index name
key_index_name: Optional[str] = None


class DocSumChatCompletionRequest(ChatCompletionRequest):
Expand Down
3 changes: 2 additions & 1 deletion comps/cores/proto/docarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class EmbedDoc(BaseDoc):
lambda_mult: float = 0.5
score_threshold: float = 0.2
constraints: Optional[Union[Dict[str, Any], List[Dict[str, Any]], None]] = None

index_name: Optional[str] = None

class EmbedMultimodalDoc(EmbedDoc):
# extend EmbedDoc with these attributes
Expand Down Expand Up @@ -225,6 +225,7 @@ class LLMParams(BaseDoc):
repetition_penalty: float = 1.03
stream: bool = True
language: str = "auto" # can be "en", "zh"
key_index_name: Optional[str] = None

chat_template: Optional[str] = Field(
default=None,
Expand Down
95 changes: 63 additions & 32 deletions comps/dataprep/src/integrations/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ def format_redis_conn_from_env():
REDIS_URL = format_redis_conn_from_env()
redis_pool = redis.ConnectionPool.from_url(REDIS_URL)


def check_index_existance(client):
def check_index_existance(client, index_name: str = KEY_INDEX_NAME):
if logflag:
logger.info(f"[ check index existence ] checking {client}")
try:
results = client.search("*")

if logflag:
logger.info(f"[ check index existence ] index of client exists: {client}")
return results
Expand All @@ -120,6 +120,8 @@ def create_index(client, index_name: str = KEY_INDEX_NAME):
try:
definition = IndexDefinition(index_type=IndexType.HASH, prefix=["file:"])
client.create_index((TextField("file_name"), TextField("key_ids")), definition=definition)
# client.create_index((TextField("index_name"), TextField("file_name"), TextField("key_ids")), definition=definition)

if logflag:
logger.info(f"[ create index ] index {index_name} successfully created")
except Exception as e:
Expand All @@ -131,14 +133,14 @@ def create_index(client, index_name: str = KEY_INDEX_NAME):

def store_by_id(client, key, value):
if logflag:
logger.info(f"[ store by id ] storing ids of {key}")
logger.info(f"[ store by id ] storing ids of {client.index_name + '_' + key}")
try:
client.add_document(doc_id="file:" + key, file_name=key, key_ids=value)
client.add_document(doc_id="file:" + client.index_name + '_' + key, file_name=client.index_name + '_' + key, key_ids=value)
if logflag:
logger.info(f"[ store by id ] store document success. id: file:{key}")
logger.info(f"[ store by id ] store document success. id: file:{client.index_name + '_' + key}")
except Exception as e:
if logflag:
logger.info(f"[ store by id ] fail to store document file:{key}: {e}")
logger.info(f"[ store by id ] fail to store document file:{client.index_name + '_' + key}: {e}")
return False
return True

Expand Down Expand Up @@ -184,8 +186,9 @@ def delete_by_id(client, id):


def ingest_chunks_to_redis(file_name: str, chunks: List):
KEY_INDEX_NAME = os.getenv("KEY_INDEX_NAME", "file-keys")
if logflag:
logger.info(f"[ redis ingest chunks ] file name: {file_name}")
logger.info(f"[ redis ingest chunks ] file name: '{file_name}' to '{KEY_INDEX_NAME}' index.")
# Create vectorstore
if TEI_EMBEDDING_ENDPOINT:
if not HUGGINGFACEHUB_API_TOKEN:
Expand Down Expand Up @@ -223,23 +226,26 @@ def ingest_chunks_to_redis(file_name: str, chunks: List):
_, keys = Redis.from_texts_return_keys(
texts=batch_texts,
embedding=embedder,
index_name=INDEX_NAME,
index_name=KEY_INDEX_NAME,
redis_url=REDIS_URL,
)
keys = [k.replace(KEY_INDEX_NAME, KEY_INDEX_NAME + '_' + file_name) for k in keys]
if logflag:
logger.info(f"[ redis ingest chunks ] keys: {keys}")
file_ids.extend(keys)
if logflag:
logger.info(f"[ redis ingest chunks ] Processed batch {i//batch_size + 1}/{(num_chunks-1)//batch_size + 1}")

# store file_ids into index file-keys
r = redis.Redis(connection_pool=redis_pool)
client = r.ft(KEY_INDEX_NAME)

if not check_index_existance(client):
assert create_index(client)

assert create_index(client, index_name=KEY_INDEX_NAME)
try:
assert store_by_id(client, key=file_name, value="#".join(file_ids))

except Exception as e:
if logflag:
logger.info(f"[ redis ingest chunks ] {e}. Fail to store chunks of file {file_name}.")
Expand Down Expand Up @@ -288,6 +294,7 @@ def ingest_data_to_redis(doc_path: DocPath):
logger.info(f"[ redis ingest data ] Done preprocessing. Created {len(chunks)} chunks of the given file.")

file_name = doc_path.path.split("/")[-1]

return ingest_chunks_to_redis(file_name, chunks)


Expand Down Expand Up @@ -364,34 +371,42 @@ async def ingest_files(
if logflag:
logger.info(f"[ redis ingest ] files:{files}")
logger.info(f"[ redis ingest ] link_list:{link_list}")


KEY_INDEX_NAME = os.getenv("KEY_INDEX_NAME", "file-keys")
if KEY_INDEX_NAME != "file-keys":
logger.info(f"KEY_INDEX_NAME: {KEY_INDEX_NAME} is different than the default one. Setting up the parameters.")
self.data_index_client = self.client.ft(INDEX_NAME)
self.key_index_client = self.client.ft(KEY_INDEX_NAME)

if files:
if not isinstance(files, list):
files = [files]
uploaded_files = []

for file in files:
encode_file = encode_filename(file.filename)
doc_id = "file:" + encode_file
doc_id = "file:" + KEY_INDEX_NAME + '_' + encode_file
if logflag:
logger.info(f"[ redis ingest ] processing file {doc_id}")

# check whether the file already exists
key_ids = None
try:
key_ids = search_by_id(self.key_index_client, doc_id).key_ids
if logflag:
logger.info(f"[ redis ingest] File {file.filename} already exists.")
except Exception as e:
logger.info(f"[ redis ingest] File {file.filename} does not exist.")
if key_ids:
raise HTTPException(
status_code=400,
detail=f"Uploaded file {file.filename} already exists. Please change file name.",
)
if KEY_INDEX_NAME in self.get_list_of_indices():
# check whether the file already exists
key_ids = None
try:
key_ids = search_by_id(self.key_index_client, doc_id).key_ids
if logflag:
logger.info(f"[ redis ingest] File '{file.filename}' already exists in '{KEY_INDEX_NAME}' index.")
except Exception as e:
logger.info(f"[ redis ingest] File {file.filename} does not exist.")
if key_ids:
raise HTTPException(
status_code=400,
detail=f"Uploaded file '{file.filename}' already exists in '{KEY_INDEX_NAME}' index. Please change file name or 'index_name'.",
)

save_path = upload_folder + encode_file
await save_content_to_local_disk(save_path, file)

ingest_data_to_redis(
DocPath(
path=save_path,
Expand Down Expand Up @@ -451,36 +466,38 @@ async def ingest_files(

raise HTTPException(status_code=400, detail="Must provide either a file or a string list.")

async def get_files(self):
async def get_files(self, key_index_name=KEY_INDEX_NAME):
"""Get file structure from redis database in the format of
{
"name": "File Name",
"id": "File Name",
"type": "File",
"parent": "",
}"""


if key_index_name is None:
key_index_name = KEY_INDEX_NAME
if logflag:
logger.info("[ redis get ] start to get file structure")

offset = 0
file_list = []

# check index existence
res = check_index_existance(self.key_index_client)
res = key_index_name in self.get_list_of_indices()
if not res:
if logflag:
logger.info(f"[ redis get ] index {KEY_INDEX_NAME} does not exist")
logger.info(f"[ redis get ] index {key_index_name} does not exist")
return file_list

while True:
response = self.client.execute_command(
"FT.SEARCH", KEY_INDEX_NAME, "*", "LIMIT", offset, offset + SEARCH_BATCH_SIZE
"FT.SEARCH", key_index_name, "*", "LIMIT", offset, offset + SEARCH_BATCH_SIZE
)
# no doc retrieved
if len(response) < 2:
break
file_list = format_search_results(response, file_list)
file_list = format_search_results(response, key_index_name, file_list)
offset += SEARCH_BATCH_SIZE
# last batch
if (len(response) - 1) // 2 < SEARCH_BATCH_SIZE:
Expand Down Expand Up @@ -608,3 +625,17 @@ async def delete_files(self, file_path: str = Body(..., embed=True)):
if logflag:
logger.info(f"[ redis delete ] Delete folder {file_path} is not supported for now.")
raise HTTPException(status_code=404, detail=f"Delete folder {file_path} is not supported for now.")

def get_list_of_indices(self):
"""
Retrieves a list of all indices from the Redis client.

Returns:
A list of index names as strings.
"""
# Execute the command to list all indices
indices = self.client.execute_command('FT._LIST')
# Decode each index name from bytes to string
indices_list = [item.decode('utf-8') for item in indices]
return indices_list

67 changes: 64 additions & 3 deletions comps/dataprep/src/integrations/redis_multimodal.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import shutil
import time
import uuid
import redis

from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Type, Union

Expand Down Expand Up @@ -105,6 +107,8 @@ def format_redis_conn_from_env():
logger = CustomLogger("opea_dataprep_redis_multimodal")
logflag = os.getenv("LOGFLAG", False)

redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=os.getenv("REDIS_PASSWORD", None))


class MultimodalRedis(Redis):
"""Redis vector database to process multimodal data."""
Expand Down Expand Up @@ -192,6 +196,7 @@ def from_text_image_pairs_return_keys(
if images
else instance.add_text(texts, metadatas, keys=keys)
)

return instance, keys

def add_text_image_pairs(
Expand Down Expand Up @@ -329,6 +334,10 @@ class OpeaMultimodalRedisDataprep(OpeaComponent):

def __init__(self, name: str, description: str, config: dict = None):
super().__init__(name, ServiceType.DATAPREP.name.lower(), description, config)

print(">>>>>>>>>>>>>>>>>>>> OpeaMultimodalRedisDataprep - __init__ name:", name)
print(">>>>>>>>>>>>>>>>>>>> OpeaMultimodalRedisDataprep - __init__ description:", description)

self.device = "cpu"
self.upload_folder = "./uploaded_files/"
# Load embeddings model
Expand Down Expand Up @@ -454,6 +463,8 @@ def ingest_multimodal(self, filename, data_folder, embeddings, is_pdf=False):
path_to_frames = os.path.join(data_folder, "frames")

annotation = load_json_file(annotation_file_path)

print(">>>>>>>>>>>>>>>>>>>> ingest_multimodal - annotation:", annotation)

# prepare data to ingest
if is_pdf:
Expand All @@ -464,7 +475,13 @@ def ingest_multimodal(self, filename, data_folder, embeddings, is_pdf=False):
text_list, image_list, metadatas = self.prepare_data_and_metadata_from_annotation(
annotation, path_to_frames, filename
)


INDEX_NAME = os.getenv("INDEX_NAME", "mm-rag-redis")

print(">>>>>>>>>>>>>>>>>>>> ingest_multimodal - filename:", filename)
print(">>>>>>>>>>>>>>>>>>>> ingest_multimodal - text_list:", len(text_list))
print()

MultimodalRedis.from_text_image_pairs_return_keys(
texts=[f"From {filename}. " + text for text in text_list],
images=image_list,
Expand All @@ -474,7 +491,48 @@ def ingest_multimodal(self, filename, data_folder, embeddings, is_pdf=False):
index_schema=INDEX_SCHEMA,
redis_url=REDIS_URL,
)

def get_list_of_indices(self, redis_client=redis_client):
"""
Retrieves a list of all indices from the Redis client.

Args:
redis_client: The Redis client instance to use for executing commands.

Returns:
A list of index names as strings.
"""
# Execute the command to list all indices
indices = redis_client.execute_command('FT._LIST')
# Decode each index name from bytes to string and strip any surrounding single quotes
indices_list = [item.decode('utf-8').strip("'") for item in indices]
print(">>>>>>>>>>>>>>>>>>>> redis mm - get_list_of_indices ", indices_list)
return indices_list

def get_items_of_index(self, index_name=INDEX_NAME, redis_client=redis_client):
"""
Retrieves items from a specific index in Redis.

Args:
index_name: The name of the index to search.
redis_client: The Redis client instance to use for executing commands.

Returns:
A sorted list of items from the specified index.
"""
# Execute the command to search for all items in the specified index
results = redis_client.execute_command(f'FT.SEARCH {index_name} {"*"} LIMIT 0 100')
list_of_items = []
# Iterate through the results
for r in results:
if isinstance(r, list):
# Extract and decode the item where 'source_video' is found in the value
list_of_items.append(
[r[i+1].decode('utf-8') for i, v in enumerate(r) if 'source_video' in str(v)][0]
)
# Return the sorted list of items
return sorted(list_of_items)

def drop_index(self, index_name, redis_url=REDIS_URL):
logger.info(f"dropping index {index_name}")
try:
Expand Down Expand Up @@ -648,6 +706,7 @@ async def ingest_generate_captions(self, files: List[UploadFile] = File(None)):
raise HTTPException(status_code=400, detail="Must provide at least one file.")

async def ingest_files(self, files: Optional[Union[UploadFile, List[UploadFile]]] = File(None)):

if files:
accepted_media_formats = [".mp4", ".png", ".jpg", ".jpeg", ".gif", ".pdf"]
# Create a lookup dictionary containing all media files
Expand Down Expand Up @@ -706,6 +765,7 @@ async def ingest_files(self, files: Optional[Union[UploadFile, List[UploadFile]]
uploaded_files_map[file_name] = media_file_name

if file_extension == ".pdf":

# Set up location to store pdf images and text, reusing "frames" and "annotations" from video
output_dir = os.path.join(self.upload_folder, media_dir_name)
os.makedirs(output_dir, exist_ok=True)
Expand Down Expand Up @@ -745,14 +805,15 @@ async def ingest_files(self, files: Optional[Union[UploadFile, List[UploadFile]]
"sub_video_id": image_idx,
}
)

with open(os.path.join(output_dir, "annotations.json"), "w") as f:
json.dump(annotations, f)

# Ingest multimodal data into redis
self.ingest_multimodal(
file_name, os.path.join(self.upload_folder, media_dir_name), self.embeddings, is_pdf=True
)

else:
# Save caption file in upload directory
caption_file_extension = os.path.splitext(matched_files[media_file][1].filename)[1]
Expand Down Expand Up @@ -792,7 +853,7 @@ async def ingest_files(self, files: Optional[Union[UploadFile, List[UploadFile]]

async def get_files(self):
"""Returns list of names of uploaded videos saved on the server."""

if not Path(self.upload_folder).exists():
logger.info("No file uploaded, return empty list.")
return []
Expand Down
Loading
Loading