Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion backend/agents/create_agent_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ async def create_tool_config_list(agent_id, tenant_id, user_id, version_no: int

# special logic for search tools that may use reranking models
if tool_config.class_name == "KnowledgeBaseSearchTool":
is_multimodal = tool_config.params.pop("multimodal", False)
rerank = param_dict.get("rerank", False)
rerank_model_name = param_dict.get("rerank_model_name", "")
rerank_model = None
Expand All @@ -363,7 +364,7 @@ async def create_tool_config_list(agent_id, tenant_id, user_id, version_no: int

tool_config.metadata = {
"vdb_core": get_vector_db_core(),
"embedding_model": get_embedding_model(tenant_id=tenant_id),
"embedding_model": get_embedding_model(tenant_id=tenant_id, is_multimodal=is_multimodal),
"rerank_model": rerank_model,
}
elif tool_config.class_name in ["DifySearchTool", "DataMateSearchTool"]:
Expand Down
20 changes: 11 additions & 9 deletions backend/apps/file_management_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import re
import base64
from http import HTTPStatus
from typing import List, Optional
from typing import Annotated, List, Optional
from urllib.parse import urlparse, urlunparse, unquote, quote

import httpx
Expand Down Expand Up @@ -115,12 +115,13 @@ async def upload_files(

@file_management_config_router.post("/process")
async def process_files(
files: List[dict] = Body(
..., description="List of file details to process, including path_or_url and filename"),
chunking_strategy: Optional[str] = Body("basic"),
index_name: str = Body(...),
destination: str = Body(...),
authorization: Optional[str] = Header(None)
files: Annotated[List[dict], Body(
..., description="List of file details to process, including path_or_url and filename")],
index_name: Annotated[str, Body(...)],
destination: Annotated[str, Body(...)],
chunking_strategy: Annotated[Optional[str], Body(...)] = "basic",
is_multimodal: Annotated[Optional[bool], Body(...)] = False,
authorization: Annotated[Optional[str], Header()] = None
):
"""
Trigger data processing for a list of uploaded files.
Expand All @@ -133,7 +134,8 @@ async def process_files(
chunking_strategy=chunking_strategy,
source_type=destination,
index_name=index_name,
authorization=authorization
authorization=authorization,
is_multimodal=is_multimodal
)

process_result = await trigger_data_process(files, process_params)
Expand Down Expand Up @@ -639,4 +641,4 @@ async def preview_file(
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"Failed to preview file: {str(e)}"
)
)
7 changes: 4 additions & 3 deletions backend/apps/model_managment_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from fastapi.responses import JSONResponse
from fastapi.encoders import jsonable_encoder
from http import HTTPStatus
from typing import List, Optional
from typing import Annotated, List, Optional
from services.model_health_service import (
check_model_connectivity,
verify_model_config_connectivity,
Expand Down Expand Up @@ -297,7 +297,8 @@ async def get_llm_model_list(authorization: Optional[str] = Header(None)):

@router.post("/healthcheck")
async def check_model_health(
display_name: str = Query(..., description="Display name to check"),
display_name: Annotated[str, Query(..., description="Display name to check")],
model_type: Annotated[str, Query(..., description="...")],
authorization: Optional[str] = Header(None)
):
"""Check and update model connectivity, returning the latest status.
Expand All @@ -308,7 +309,7 @@ async def check_model_health(
"""
try:
_, tenant_id = get_current_user_id(authorization)
result = await check_model_connectivity(display_name, tenant_id)
result = await check_model_connectivity(display_name, tenant_id, model_type)
return JSONResponse(status_code=HTTPStatus.OK, content={
"message": "Successfully checked model connectivity",
"data": result
Expand Down
23 changes: 12 additions & 11 deletions backend/apps/vectordatabase_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,15 @@ def create_new_index(
# Extract optional fields from request body
ingroup_permission = None
group_ids = None
embedding_model_name = None
is_multimodal = False
embedding_model_name: Optional[str] = None
if request:
ingroup_permission = request.get("ingroup_permission")
group_ids = request.get("group_ids")
embedding_model_name = request.get("embedding_model_name")
is_multimodal = request.get("is_multimodal")
embedding_model_name = request.get("embeddingModel") or request.get("embedding_model")
if isinstance(embedding_model_name, str):
embedding_model_name = embedding_model_name.strip() or None

# Treat path parameter as user-facing knowledge base name for new creations
return ElasticSearchService.create_knowledge_base(
Expand All @@ -80,7 +84,7 @@ def create_new_index(
tenant_id=tenant_id,
ingroup_permission=ingroup_permission,
group_ids=group_ids,
embedding_model_name=embedding_model_name,
is_multimodal=is_multimodal,
)
except Exception as e:
raise HTTPException(
Expand Down Expand Up @@ -124,13 +128,15 @@ async def update_index(
knowledge_name = request.get("knowledge_name")
ingroup_permission = request.get("ingroup_permission")
group_ids = request.get("group_ids")
is_multimodal = request.get("is_multimodal")

# Call service layer to update knowledge base
result = ElasticSearchService.update_knowledge_base(
index_name=index_name,
knowledge_name=knowledge_name,
ingroup_permission=ingroup_permission,
group_ids=group_ids,
is_multimodal=is_multimodal,
tenant_id=tenant_id,
user_id=user_id,
)
Expand Down Expand Up @@ -199,15 +205,10 @@ def create_index_documents(
try:
user_id, tenant_id = get_current_user_id(authorization)

# Get the knowledge base record to retrieve the saved embedding model
knowledge_record = get_knowledge_record({'index_name': index_name})
saved_embedding_model_name = None
if knowledge_record:
saved_embedding_model_name = knowledge_record.get('embedding_model_name')

# Use the saved model from knowledge base, fallback to tenant default if not set
embedding_model = get_embedding_model(tenant_id, saved_embedding_model_name)

is_multimodal = True if knowledge_record.get(
'is_multimodal') == 'Y' else False
embedding_model = get_embedding_model(tenant_id, is_multimodal)
return ElasticSearchService.index_documents(
embedding_model=embedding_model,
index_name=index_name,
Expand Down
5 changes: 5 additions & 0 deletions backend/consts/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ class VectorDatabaseType(str, Enum):
# Data Processing Service Configuration
DATA_PROCESS_SERVICE = os.getenv("DATA_PROCESS_SERVICE")
CLIP_MODEL_PATH = os.getenv("CLIP_MODEL_PATH")
TABLE_TRANSFORMER_MODEL_PATH = os.getenv("TABLE_TRANSFORMER_MODEL_PATH")
UNSTRUCTURED_DEFAULT_MODEL_INITIALIZE_PARAMS_JSON_PATH = os.getenv(
"UNSTRUCTURED_DEFAULT_MODEL_INITIALIZE_PARAMS_JSON_PATH"
)


# Upload Configuration
Expand Down Expand Up @@ -109,6 +113,7 @@ class VectorDatabaseType(str, Enum):
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY")
MINIO_REGION = os.getenv("MINIO_REGION")
MINIO_DEFAULT_BUCKET = os.getenv("MINIO_DEFAULT_BUCKET")
S3_URL_PREFIX = "s3://"


# Postgres Configuration
Expand Down
1 change: 1 addition & 0 deletions backend/consts/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ class ProcessParams(BaseModel):
source_type: str
index_name: str
authorization: Optional[str] = None
is_multimodal: Optional[bool] = False


class OpinionRequest(BaseModel):
Expand Down
166 changes: 129 additions & 37 deletions backend/data_process/ray_actors.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
from io import BytesIO
import logging
import json
from typing import Any, Dict, List, Optional

import ray

from consts.const import RAY_ACTOR_NUM_CPUS, REDIS_BACKEND_URL, DEFAULT_EXPECTED_CHUNK_SIZE, DEFAULT_MAXIMUM_CHUNK_SIZE
from database.attachment_db import get_file_stream
from consts.const import (
RAY_ACTOR_NUM_CPUS,
REDIS_BACKEND_URL,
DEFAULT_EXPECTED_CHUNK_SIZE,
DEFAULT_MAXIMUM_CHUNK_SIZE,
TABLE_TRANSFORMER_MODEL_PATH,
UNSTRUCTURED_DEFAULT_MODEL_INITIALIZE_PARAMS_JSON_PATH,
)
from database.attachment_db import build_s3_url, get_file_stream, upload_fileobj
from database.model_management_db import get_model_by_model_id
from nexent.data_process import DataProcessCore

Expand Down Expand Up @@ -58,50 +66,137 @@ def process_file(
if task_id:
params['task_id'] = task_id

# Get chunk size parameters from embedding model if model_id is provided
if model_id and tenant_id:
try:
# Get embedding model details directly by model_id
model_record = get_model_by_model_id(
model_id=model_id, tenant_id=tenant_id)
if model_record:
expected_chunk_size = model_record.get(
'expected_chunk_size', DEFAULT_EXPECTED_CHUNK_SIZE)
maximum_chunk_size = model_record.get(
'maximum_chunk_size', DEFAULT_MAXIMUM_CHUNK_SIZE)
model_name = model_record.get('display_name')

# Pass chunk sizes to processing parameters
params['max_characters'] = maximum_chunk_size
params['new_after_n_chars'] = expected_chunk_size

logger.info(
f"[RayActor] Using chunk sizes from embedding model '{model_name}' (ID: {model_id}): "
f"max_characters={maximum_chunk_size}, new_after_n_chars={expected_chunk_size}")
else:
logger.warning(
f"[RayActor] Embedding model with ID {model_id} not found for tenant '{tenant_id}', using default chunk sizes")
except Exception as e:
self._apply_model_chunk_sizes(
model_id=model_id, tenant_id=tenant_id, params=params)
self._apply_model_paths(params)
file_data = self._read_file_bytes(source)

result = self._processor.file_process(
file_data=file_data,
filename=source,
chunking_strategy=chunking_strategy,
**params
)
chunks, images_info = self._normalize_processor_result(result)
if images_info:
self._append_image_chunks(
source=source, chunks=chunks, images_info=images_info)

chunks = self._validate_chunks(chunks, source)
if not chunks:
return []

logger.info(
f"[RayActor] Processing done: produced {len(chunks)} chunks for source='{source}'")
return chunks

def _apply_model_paths(self, params: Dict[str, Any]) -> None:
params["table_transformer_model_path"] = TABLE_TRANSFORMER_MODEL_PATH
params[
"unstructured_default_model_initialize_params_json_path"
] = UNSTRUCTURED_DEFAULT_MODEL_INITIALIZE_PARAMS_JSON_PATH

def _apply_model_chunk_sizes(
self,
model_id: Optional[int],
tenant_id: Optional[str],
params: Dict[str, Any],
) -> None:
if not (model_id and tenant_id):
return

try:
model_record = get_model_by_model_id(
model_id=model_id, tenant_id=tenant_id)
if not model_record:
logger.warning(
f"[RayActor] Failed to retrieve chunk sizes from embedding model ID {model_id}: {e}. Using default chunk sizes")
f"[RayActor] Embedding model with ID {model_id} not found for tenant '{tenant_id}', using default chunk sizes")
return

expected_chunk_size = model_record.get(
'expected_chunk_size', DEFAULT_EXPECTED_CHUNK_SIZE)
maximum_chunk_size = model_record.get(
'maximum_chunk_size', DEFAULT_MAXIMUM_CHUNK_SIZE)
model_name = model_record.get('display_name')
model_type = model_record.get('model_type')

params['max_characters'] = maximum_chunk_size
params['new_after_n_chars'] = expected_chunk_size
if model_type:
params['model_type'] = model_type

logger.info(
f"[RayActor] Using chunk sizes from embedding model '{model_name}' (ID: {model_id}): "
f"max_characters={maximum_chunk_size}, new_after_n_chars={expected_chunk_size}")
except Exception as e:
logger.warning(
f"[RayActor] Failed to retrieve chunk sizes from embedding model ID {model_id}: {e}. Using default chunk sizes")

def _read_file_bytes(self, source: str) -> bytes:
try:
file_stream = get_file_stream(source)
if file_stream is None:
raise FileNotFoundError(
f"Unable to fetch file from URL: {source}")
file_data = file_stream.read()
return file_stream.read()
except Exception as e:
logger.error(f"Failed to fetch file from {source}: {e}")
raise

chunks = self._processor.file_process(
file_data=file_data,
filename=source,
chunking_strategy=chunking_strategy,
**params
)
def _normalize_processor_result(
self, result: Any
) -> tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
if isinstance(result, tuple) and len(result) == 2:
chunks, images_info = result
return chunks or [], images_info or []
return result or [], []

def _append_image_chunks(
self,
source: str,
chunks: List[Dict[str, Any]],
images_info: List[Dict[str, Any]],
) -> None:
folder = "images_in_attachments"
for index, image_data in enumerate(images_info):
if not isinstance(image_data, dict):
logger.warning(
f"[RayActor] Skipping image entry at index {index}: unexpected type {type(image_data)}"
)
continue
if "image_bytes" not in image_data:
logger.warning(
f"[RayActor] Skipping image entry at index {index}: missing image_bytes"
)
continue

img_obj = BytesIO(image_data["image_bytes"])
result = upload_fileobj(
file_obj=img_obj,
file_name=f"{index}.{image_data['image_format']}",
prefix=folder)
image_url = build_s3_url(result.get("object_name", ""))

image_data["source_file"] = source
image_data["image_url"] = image_url

chunks.append({
"content": json.dumps({
"source_file": source,
"position": image_data["position"],
"image_url": image_url,
}),
"filename": source,
"metadata": {
"chunk_index": len(chunks) + index,
"process_source": "UniversalImageExtractor",
"image_url": image_url,
}
})

def _validate_chunks(
self, chunks: Any, source: str
) -> List[Dict[str, Any]]:
if chunks is None:
logger.warning(
f"[RayActor] file_process returned None for source='{source}'")
Expand All @@ -114,9 +209,6 @@ def process_file(
logger.warning(
f"[RayActor] file_process returned empty list for source='{source}'")
return []

logger.info(
f"[RayActor] Processing done: produced {len(chunks)} chunks for source='{source}'")
return chunks

def store_chunks_in_redis(self, redis_key: str, chunks: List[Dict[str, Any]]) -> bool:
Expand Down
Loading