diff --git a/backend/alembic/versions/a3c1a7904cd0_remove_userfile_related_deprecated_.py b/backend/alembic/versions/a3c1a7904cd0_remove_userfile_related_deprecated_.py new file mode 100644 index 00000000000..f2f2cb4f3c8 --- /dev/null +++ b/backend/alembic/versions/a3c1a7904cd0_remove_userfile_related_deprecated_.py @@ -0,0 +1,39 @@ +"""remove userfile related deprecated fields + +Revision ID: a3c1a7904cd0 +Revises: 5c3dca366b35 +Create Date: 2026-01-06 13:00:30.634396 + +""" + +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = "a3c1a7904cd0" +down_revision = "5c3dca366b35" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.drop_column("user_file", "document_id") + op.drop_column("user_file", "document_id_migrated") + op.drop_column("connector_credential_pair", "is_user_file") + + +def downgrade() -> None: + op.add_column( + "connector_credential_pair", + sa.Column("is_user_file", sa.Boolean(), nullable=False, server_default="false"), + ) + op.add_column( + "user_file", + sa.Column("document_id", sa.String(), nullable=True), + ) + op.add_column( + "user_file", + sa.Column( + "document_id_migrated", sa.Boolean(), nullable=False, server_default="true" + ), + ) diff --git a/backend/onyx/background/celery/apps/docfetching.py b/backend/onyx/background/celery/apps/docfetching.py index 9e6475051f5..022b5fee9d4 100644 --- a/backend/onyx/background/celery/apps/docfetching.py +++ b/backend/onyx/background/celery/apps/docfetching.py @@ -98,8 +98,5 @@ def on_setup_logging( celery_app.autodiscover_tasks( [ "onyx.background.celery.tasks.docfetching", - # Ensure the user files indexing worker registers the doc_id migration task - # TODO(subash): remove this once the doc_id migration is complete - "onyx.background.celery.tasks.user_file_processing", ] ) diff --git a/backend/onyx/background/celery/tasks/beat_schedule.py b/backend/onyx/background/celery/tasks/beat_schedule.py index 9fa92762ed0..303deac4b49 100644 --- a/backend/onyx/background/celery/tasks/beat_schedule.py +++ b/backend/onyx/background/celery/tasks/beat_schedule.py @@ -57,16 +57,6 @@ "expires": BEAT_EXPIRES_DEFAULT, }, }, - { - "name": "user-file-docid-migration", - "task": OnyxCeleryTask.USER_FILE_DOCID_MIGRATION, - "schedule": timedelta(minutes=10), - "options": { - "priority": OnyxCeleryPriority.HIGH, - "expires": BEAT_EXPIRES_DEFAULT, - "queue": OnyxCeleryQueues.USER_FILES_INDEXING, - }, - }, { "name": "check-for-kg-processing", "task": OnyxCeleryTask.CHECK_KG_PROCESSING, diff --git a/backend/onyx/background/celery/tasks/docfetching/task_creation_utils.py b/backend/onyx/background/celery/tasks/docfetching/task_creation_utils.py index 868db2c081e..2852886c572 100644 --- a/backend/onyx/background/celery/tasks/docfetching/task_creation_utils.py +++ b/backend/onyx/background/celery/tasks/docfetching/task_creation_utils.py @@ -72,15 +72,6 @@ def try_creating_docfetching_task( # Another indexing attempt is already running return None - # Determine which queue to use based on whether this is a user file - # TODO: at the moment the indexing pipeline is - # shared between user files and connectors - queue = ( - OnyxCeleryQueues.USER_FILES_INDEXING - if cc_pair.is_user_file - else OnyxCeleryQueues.CONNECTOR_DOC_FETCHING - ) - # Use higher priority for first-time indexing to ensure new connectors # get processed before re-indexing of existing connectors has_successful_attempt = cc_pair.last_successful_index_time is not None @@ -99,7 +90,7 @@ def try_creating_docfetching_task( search_settings_id=search_settings.id, tenant_id=tenant_id, ), - queue=queue, + queue=OnyxCeleryQueues.CONNECTOR_DOC_FETCHING, task_id=custom_task_id, priority=priority, ) diff --git a/backend/onyx/background/celery/tasks/docprocessing/tasks.py b/backend/onyx/background/celery/tasks/docprocessing/tasks.py index 89108c85042..dfacfee138d 100644 --- a/backend/onyx/background/celery/tasks/docprocessing/tasks.py +++ b/backend/onyx/background/celery/tasks/docprocessing/tasks.py @@ -59,9 +59,6 @@ from onyx.db.connector_credential_pair import ( fetch_indexable_standard_connector_credential_pair_ids, ) -from onyx.db.connector_credential_pair import ( - fetch_indexable_user_file_connector_credential_pair_ids, -) from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id from onyx.db.connector_credential_pair import set_cc_pair_repeated_error_state from onyx.db.engine.sql_engine import get_session_with_current_tenant @@ -540,12 +537,7 @@ def check_indexing_completion( ]: # User file connectors must be paused on success # NOTE: _run_indexing doesn't update connectors if the index attempt is the future embedding model - # TODO: figure out why this doesn't pause connectors during swap - cc_pair.status = ( - ConnectorCredentialPairStatus.PAUSED - if cc_pair.is_user_file - else ConnectorCredentialPairStatus.ACTIVE - ) + cc_pair.status = ConnectorCredentialPairStatus.ACTIVE db_session.commit() mt_cloud_telemetry( @@ -811,13 +803,8 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None: db_session, active_cc_pairs_only=True ) ) - user_file_cc_pair_ids = ( - fetch_indexable_user_file_connector_credential_pair_ids( - db_session, search_settings_id=current_search_settings.id - ) - ) - primary_cc_pair_ids = standard_cc_pair_ids + user_file_cc_pair_ids + primary_cc_pair_ids = standard_cc_pair_ids # Get CC pairs for secondary search settings secondary_cc_pair_ids: list[int] = [] @@ -833,14 +820,8 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None: db_session, active_cc_pairs_only=not include_paused ) ) - user_file_cc_pair_ids = ( - fetch_indexable_user_file_connector_credential_pair_ids( - db_session, search_settings_id=secondary_search_settings.id - ) - or [] - ) - secondary_cc_pair_ids = standard_cc_pair_ids + user_file_cc_pair_ids + secondary_cc_pair_ids = standard_cc_pair_ids # Flag CC pairs in repeated error state for primary/current search settings with get_session_with_current_tenant() as db_session: diff --git a/backend/onyx/background/celery/tasks/monitoring/tasks.py b/backend/onyx/background/celery/tasks/monitoring/tasks.py index 1fcc2ce15d4..b39778c1437 100644 --- a/backend/onyx/background/celery/tasks/monitoring/tasks.py +++ b/backend/onyx/background/celery/tasks/monitoring/tasks.py @@ -886,9 +886,7 @@ def monitor_celery_queues_helper( OnyxCeleryQueues.CONNECTOR_DOC_FETCHING, r_celery ) n_docprocessing = celery_get_queue_length(OnyxCeleryQueues.DOCPROCESSING, r_celery) - n_user_files_indexing = celery_get_queue_length( - OnyxCeleryQueues.USER_FILES_INDEXING, r_celery - ) + n_user_file_processing = celery_get_queue_length( OnyxCeleryQueues.USER_FILE_PROCESSING, r_celery ) @@ -924,7 +922,6 @@ def monitor_celery_queues_helper( f"docfetching_prefetched={len(n_docfetching_prefetched)} " f"docprocessing={n_docprocessing} " f"docprocessing_prefetched={len(n_docprocessing_prefetched)} " - f"user_files_indexing={n_user_files_indexing} " f"user_file_processing={n_user_file_processing} " f"user_file_project_sync={n_user_file_project_sync} " f"user_file_delete={n_user_file_delete} " diff --git a/backend/onyx/background/celery/tasks/user_file_processing/tasks.py b/backend/onyx/background/celery/tasks/user_file_processing/tasks.py index 8135fa82512..ef59d2464ef 100644 --- a/backend/onyx/background/celery/tasks/user_file_processing/tasks.py +++ b/backend/onyx/background/celery/tasks/user_file_processing/tasks.py @@ -1,6 +1,5 @@ import datetime import time -from collections.abc import Sequence from typing import Any from uuid import UUID @@ -19,11 +18,9 @@ from onyx.configs.app_configs import VESPA_CLOUD_CERT_PATH from onyx.configs.app_configs import VESPA_CLOUD_KEY_PATH from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT -from onyx.configs.constants import CELERY_USER_FILE_DOCID_MIGRATION_LOCK_TIMEOUT from onyx.configs.constants import CELERY_USER_FILE_PROCESSING_LOCK_TIMEOUT from onyx.configs.constants import CELERY_USER_FILE_PROJECT_SYNC_LOCK_TIMEOUT from onyx.configs.constants import DocumentSource -from onyx.configs.constants import FileOrigin from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask @@ -32,20 +29,13 @@ from onyx.connectors.models import Document from onyx.db.engine.sql_engine import get_session_with_current_tenant from onyx.db.enums import UserFileStatus -from onyx.db.models import FileRecord -from onyx.db.models import SearchDoc from onyx.db.models import UserFile from onyx.db.search_settings import get_active_search_settings from onyx.db.search_settings import get_active_search_settings_list from onyx.document_index.factory import get_default_document_index -from onyx.document_index.interfaces import VespaDocumentFields from onyx.document_index.interfaces import VespaDocumentUserFields -from onyx.document_index.vespa.shared_utils.utils import ( - replace_invalid_doc_id_characters, -) from onyx.document_index.vespa_constants import DOCUMENT_ID_ENDPOINT from onyx.file_store.file_store import get_default_file_store -from onyx.file_store.file_store import S3BackedFileStore from onyx.file_store.utils import user_file_id_to_plaintext_file_name from onyx.httpx.httpx_pool import HttpxPool from onyx.indexing.adapters.user_file_indexing_adapter import UserFileIndexingAdapter @@ -618,315 +608,3 @@ def process_single_user_file_project_sync( file_lock.release() return None - - -def _normalize_legacy_user_file_doc_id(old_id: str) -> str: - # Convert USER_FILE_CONNECTOR__ -> FILE_CONNECTOR__ for legacy values - user_prefix = "USER_FILE_CONNECTOR__" - file_prefix = "FILE_CONNECTOR__" - if old_id.startswith(user_prefix): - remainder = old_id[len(user_prefix) :] - return file_prefix + remainder - return old_id - - -def update_legacy_plaintext_file_records() -> None: - """Migrate legacy plaintext cache objects from int-based keys to UUID-based - keys. Copies each S3 object to its expected UUID key and updates DB. - - Examples: - - Old key: bucket/schema/plaintext_ - - New key: bucket/schema/plaintext_ - """ - - task_logger.info("update_legacy_plaintext_file_records - Starting") - - with get_session_with_current_tenant() as db_session: - store = get_default_file_store() - - if not isinstance(store, S3BackedFileStore): - task_logger.info( - "update_legacy_plaintext_file_records - Skipping non-S3 store" - ) - return - - s3_client = store._get_s3_client() - bucket_name = store._get_bucket_name() - - # Select PLAINTEXT_CACHE records whose object_key ends with 'plaintext_' + non-hyphen chars - # Example: 'some/path/plaintext_abc123' matches; '.../plaintext_foo-bar' does not - plaintext_records: Sequence[FileRecord] = ( - db_session.execute( - sa.select(FileRecord).where( - FileRecord.file_origin == FileOrigin.PLAINTEXT_CACHE, - FileRecord.object_key.op("~")(r"plaintext_[^-]+$"), - ) - ) - .scalars() - .all() - ) - - task_logger.info( - f"update_legacy_plaintext_file_records - Found {len(plaintext_records)} plaintext records to update" - ) - - normalized = 0 - for fr in plaintext_records: - try: - expected_key = store._get_s3_key(fr.file_id) - if fr.object_key == expected_key: - continue - - if fr.bucket_name is None: - task_logger.warning(f"id={fr.file_id} - Bucket name is None") - continue - - if fr.object_key is None: - task_logger.warning(f"id={fr.file_id} - Object key is None") - continue - - # Copy old object to new key - copy_source = f"{fr.bucket_name}/{fr.object_key}" - s3_client.copy_object( - CopySource=copy_source, - Bucket=bucket_name, - Key=expected_key, - MetadataDirective="COPY", - ) - - # Delete old object (best-effort) - try: - s3_client.delete_object(Bucket=fr.bucket_name, Key=fr.object_key) - except Exception: - pass - - # Update DB record with new key - fr.object_key = expected_key - db_session.add(fr) - normalized += 1 - except Exception as e: - task_logger.warning(f"id={fr.file_id} - {e.__class__.__name__}") - - if normalized: - db_session.commit() - task_logger.info( - f"user_file_docid_migration_task normalized {normalized} plaintext objects" - ) - - -@shared_task( - name=OnyxCeleryTask.USER_FILE_DOCID_MIGRATION, - ignore_result=True, - bind=True, -) -def user_file_docid_migration_task(self: Task, *, tenant_id: str) -> bool: - - task_logger.info( - f"user_file_docid_migration_task - Starting for tenant={tenant_id}" - ) - - redis_client = get_redis_client(tenant_id=tenant_id) - lock: RedisLock = redis_client.lock( - OnyxRedisLocks.USER_FILE_DOCID_MIGRATION_LOCK, - timeout=CELERY_USER_FILE_DOCID_MIGRATION_LOCK_TIMEOUT, - ) - - if not lock.acquire(blocking=False): - task_logger.info( - f"user_file_docid_migration_task - Lock held, skipping tenant={tenant_id}" - ) - return False - - updated_count = 0 - try: - update_legacy_plaintext_file_records() - # Track lock renewal - last_lock_time = time.monotonic() - with get_session_with_current_tenant() as db_session: - - # 20 is the documented default for httpx max_keepalive_connections - if MANAGED_VESPA: - httpx_init_vespa_pool( - 20, ssl_cert=VESPA_CLOUD_CERT_PATH, ssl_key=VESPA_CLOUD_KEY_PATH - ) - else: - httpx_init_vespa_pool(20) - - active_settings = get_active_search_settings(db_session) - document_index = get_default_document_index( - search_settings=active_settings.primary, - secondary_search_settings=active_settings.secondary, - httpx_client=HttpxPool.get("vespa"), - ) - - retry_index = RetryDocumentIndex(document_index) - - # Select user files with a legacy doc id that have not been migrated - user_files = ( - db_session.execute( - sa.select(UserFile).where( - sa.and_( - UserFile.document_id.is_not(None), - UserFile.document_id_migrated.is_(False), - ) - ) - ) - .scalars() - .all() - ) - - task_logger.info( - f"user_file_docid_migration_task - Found {len(user_files)} user files to migrate" - ) - - # Query all SearchDocs that need updating - search_docs = ( - db_session.execute( - sa.select(SearchDoc).where( - SearchDoc.document_id.like("%FILE_CONNECTOR__%") - ) - ) - .scalars() - .all() - ) - task_logger.info( - f"user_file_docid_migration_task - Found {len(search_docs)} search docs to update" - ) - - # Build a map of normalized doc IDs to SearchDocs - search_doc_map: dict[str, list[SearchDoc]] = {} - for sd in search_docs: - doc_id = sd.document_id - if search_doc_map.get(doc_id) is None: - search_doc_map[doc_id] = [] - search_doc_map[doc_id].append(sd) - - task_logger.debug( - f"user_file_docid_migration_task - Built search doc map with {len(search_doc_map)} entries" - ) - - ids_preview = list(search_doc_map.keys())[:5] - task_logger.debug( - f"user_file_docid_migration_task - First few search_doc_map ids: {ids_preview if ids_preview else 'No ids found'}" - ) - task_logger.debug( - f"user_file_docid_migration_task - search_doc_map total items: " - f"{sum(len(docs) for docs in search_doc_map.values())}" - ) - for user_file in user_files: - # Periodically renew the Redis lock to prevent expiry mid-run - current_time = time.monotonic() - if current_time - last_lock_time >= ( - CELERY_USER_FILE_DOCID_MIGRATION_LOCK_TIMEOUT / 4 - ): - renewed = False - try: - # extend lock ttl to full timeout window - lock.extend(CELERY_USER_FILE_DOCID_MIGRATION_LOCK_TIMEOUT) - renewed = True - except Exception: - # if extend fails, best-effort reacquire as a fallback - try: - lock.reacquire() - renewed = True - except Exception: - renewed = False - last_lock_time = current_time - if not renewed or not lock.owned(): - task_logger.error( - "user_file_docid_migration_task - Lost lock ownership or failed to renew; aborting for safety" - ) - return False - - try: - clean_old_doc_id = replace_invalid_doc_id_characters( - user_file.document_id - ) - normalized_doc_id = _normalize_legacy_user_file_doc_id( - clean_old_doc_id - ) - user_project_ids = [project.id for project in user_file.projects] - task_logger.info( - f"user_file_docid_migration_task - Migrating user file {user_file.id} with doc_id {normalized_doc_id}" - ) - - index_name = active_settings.primary.index_name - - # First find the chunks count using direct Vespa query - selection = f"{index_name}.document_id=='{normalized_doc_id}'" - - # Count all chunks for this document - chunk_count = _get_document_chunk_count( - index_name=index_name, - selection=selection, - ) - - task_logger.info( - f"Found {chunk_count} chunks for document {normalized_doc_id}" - ) - - # Now update Vespa chunks with the found chunk count using retry_index - # WARNING: In the future this will error; we no longer want - # to support changing document ID. - # TODO(andrei): Delete soon. - retry_index.update_single( - doc_id=str(normalized_doc_id), - tenant_id=tenant_id, - chunk_count=chunk_count, - fields=VespaDocumentFields(document_id=str(user_file.id)), - user_fields=VespaDocumentUserFields( - user_projects=user_project_ids - ), - ) - user_file.chunk_count = chunk_count - - # Update the SearchDocs - actual_doc_id = str(user_file.document_id) - normalized_actual_doc_id = _normalize_legacy_user_file_doc_id( - actual_doc_id - ) - if ( - normalized_doc_id in search_doc_map - or normalized_actual_doc_id in search_doc_map - ): - to_update = ( - search_doc_map[normalized_doc_id] - if normalized_doc_id in search_doc_map - else search_doc_map[normalized_actual_doc_id] - ) - task_logger.debug( - f"user_file_docid_migration_task - Updating {len(to_update)} search docs for user file {user_file.id}" - ) - for search_doc in to_update: - search_doc.document_id = str(user_file.id) - db_session.add(search_doc) - - user_file.document_id_migrated = True - db_session.add(user_file) - db_session.commit() - updated_count += 1 - except Exception as per_file_exc: - # Rollback the current transaction and continue with the next file - db_session.rollback() - task_logger.exception( - f"user_file_docid_migration_task - Error migrating user file {user_file.id} - " - f"{per_file_exc.__class__.__name__}" - ) - - task_logger.info( - f"user_file_docid_migration_task - Updated {updated_count} user files" - ) - - task_logger.info( - f"user_file_docid_migration_task - Completed for tenant={tenant_id} (updated={updated_count})" - ) - return True - except Exception as e: - task_logger.exception( - f"user_file_docid_migration_task - Error during execution for tenant={tenant_id} " - f"(updated={updated_count}) exception={e.__class__.__name__}" - ) - return False - finally: - if lock.owned(): - lock.release() diff --git a/backend/onyx/configs/app_configs.py b/backend/onyx/configs/app_configs.py index 4b1dfaa3a47..2a1a2f2ef1e 100644 --- a/backend/onyx/configs/app_configs.py +++ b/backend/onyx/configs/app_configs.py @@ -679,10 +679,6 @@ def get_current_tz_offset() -> int: os.environ.get("INDEXING_EMBEDDING_MODEL_NUM_THREADS") or 8 ) -# Maximum number of user file connector credential pairs to index in a single batch -# Setting this number too high may overload the indexing process -USER_FILE_INDEXING_LIMIT = int(os.environ.get("USER_FILE_INDEXING_LIMIT") or 100) - # Maximum file size in a document to be indexed MAX_DOCUMENT_CHARS = int(os.environ.get("MAX_DOCUMENT_CHARS") or 5_000_000) MAX_FILE_SIZE_BYTES = int( diff --git a/backend/onyx/configs/constants.py b/backend/onyx/configs/constants.py index 25b6a3db831..e59e6df3545 100644 --- a/backend/onyx/configs/constants.py +++ b/backend/onyx/configs/constants.py @@ -146,9 +146,6 @@ CELERY_EXTERNAL_GROUP_SYNC_LOCK_TIMEOUT = 300 # 5 min -# Doc ID migration can be long-running; use a longer TTL and renew periodically -CELERY_USER_FILE_DOCID_MIGRATION_LOCK_TIMEOUT = 10 * 60 # 10 minutes (in seconds) - CELERY_USER_FILE_PROCESSING_LOCK_TIMEOUT = 30 * 60 # 30 minutes (in seconds) CELERY_USER_FILE_PROJECT_SYNC_LOCK_TIMEOUT = 5 * 60 # 5 minutes (in seconds) @@ -366,9 +363,6 @@ class OnyxCeleryQueues: CONNECTOR_EXTERNAL_GROUP_SYNC = "connector_external_group_sync" CSV_GENERATION = "csv_generation" - # Indexing queue - USER_FILES_INDEXING = "user_files_indexing" - # User file processing queue USER_FILE_PROCESSING = "user_file_processing" USER_FILE_PROJECT_SYNC = "user_file_project_sync" @@ -427,7 +421,6 @@ class OnyxRedisLocks: USER_FILE_PROJECT_SYNC_LOCK_PREFIX = "da_lock:user_file_project_sync" USER_FILE_DELETE_BEAT_LOCK = "da_lock:check_user_file_delete_beat" USER_FILE_DELETE_LOCK_PREFIX = "da_lock:user_file_delete" - USER_FILE_DOCID_MIGRATION_LOCK = "da_lock:user_file_docid_migration" class OnyxRedisSignals: @@ -534,7 +527,6 @@ class OnyxCeleryTask: CONNECTOR_PRUNING_GENERATOR_TASK = "connector_pruning_generator_task" DOCUMENT_BY_CC_PAIR_CLEANUP_TASK = "document_by_cc_pair_cleanup_task" VESPA_METADATA_SYNC_TASK = "vespa_metadata_sync_task" - USER_FILE_DOCID_MIGRATION = "user_file_docid_migration" # chat retention CHECK_TTL_MANAGEMENT_TASK = "check_ttl_management_task" diff --git a/backend/onyx/db/connector_credential_pair.py b/backend/onyx/db/connector_credential_pair.py index 3df07597202..664f3200db5 100644 --- a/backend/onyx/db/connector_credential_pair.py +++ b/backend/onyx/db/connector_credential_pair.py @@ -6,11 +6,8 @@ from sqlalchemy import delete from sqlalchemy import desc from sqlalchemy import exists -from sqlalchemy import lateral -from sqlalchemy import or_ from sqlalchemy import Select from sqlalchemy import select -from sqlalchemy import true from sqlalchemy import update from sqlalchemy.orm import aliased from sqlalchemy.orm import joinedload @@ -19,7 +16,6 @@ from onyx.configs.app_configs import AUTH_TYPE from onyx.configs.app_configs import DISABLE_AUTH -from onyx.configs.app_configs import USER_FILE_INDEXING_LIMIT from onyx.configs.constants import AuthType from onyx.configs.constants import DocumentSource from onyx.db.connector import fetch_connector_by_id @@ -120,7 +116,6 @@ def get_connector_credential_pairs_for_user( eager_load_connector: bool = False, eager_load_credential: bool = False, eager_load_user: bool = False, - include_user_files: bool = False, order_by_desc: bool = False, source: DocumentSource | None = None, ) -> list[ConnectorCredentialPair]: @@ -149,9 +144,6 @@ def get_connector_credential_pairs_for_user( if ids: stmt = stmt.where(ConnectorCredentialPair.id.in_(ids)) - if not include_user_files: - stmt = stmt.where(ConnectorCredentialPair.is_user_file.is_(False)) - if order_by_desc: stmt = stmt.order_by(desc(ConnectorCredentialPair.id)) @@ -186,16 +178,13 @@ def get_connector_credential_pairs_for_user_parallel( def get_connector_credential_pairs( - db_session: Session, ids: list[int] | None = None, include_user_files: bool = False + db_session: Session, ids: list[int] | None = None ) -> list[ConnectorCredentialPair]: stmt = select(ConnectorCredentialPair).distinct() if ids: stmt = stmt.where(ConnectorCredentialPair.id.in_(ids)) - if not include_user_files: - stmt = stmt.where(ConnectorCredentialPair.is_user_file != True) # noqa: E712 - return list(db_session.scalars(stmt).all()) @@ -242,15 +231,12 @@ def get_connector_credential_pair_for_user( connector_id: int, credential_id: int, user: User | None, - include_user_files: bool = False, get_editable: bool = True, ) -> ConnectorCredentialPair | None: stmt = select(ConnectorCredentialPair) stmt = _add_user_filters(stmt, user, get_editable) stmt = stmt.where(ConnectorCredentialPair.connector_id == connector_id) stmt = stmt.where(ConnectorCredentialPair.credential_id == credential_id) - if not include_user_files: - stmt = stmt.where(ConnectorCredentialPair.is_user_file != True) # noqa: E712 result = db_session.execute(stmt) return result.scalar_one_or_none() @@ -377,8 +363,6 @@ def _update_connector_credential_pair( cc_pair.total_docs_indexed += net_docs if status is not None: cc_pair.status = status - if cc_pair.is_user_file: - cc_pair.status = ConnectorCredentialPairStatus.PAUSED db_session.commit() @@ -536,7 +520,6 @@ def add_credential_to_connector( initial_status: ConnectorCredentialPairStatus = ConnectorCredentialPairStatus.SCHEDULED, last_successful_index_time: datetime | None = None, seeding_flow: bool = False, - is_user_file: bool = False, ) -> StatusResponse: connector = fetch_connector_by_id(connector_id, db_session) @@ -602,7 +585,6 @@ def add_credential_to_connector( access_type=access_type, auto_sync_options=auto_sync_options, last_successful_index_time=last_successful_index_time, - is_user_file=is_user_file, ) db_session.add(association) db_session.flush() # make sure the association has an id @@ -699,67 +681,12 @@ def fetch_indexable_standard_connector_credential_pair_ids( ) ) - # Exclude user files. NOTE: some cc pairs have null for is_user_file instead of False - stmt = stmt.where(ConnectorCredentialPair.is_user_file.is_not(True)) - if limit: stmt = stmt.limit(limit) return list(db_session.scalars(stmt)) -def fetch_indexable_user_file_connector_credential_pair_ids( - db_session: Session, - search_settings_id: int, - limit: int | None = USER_FILE_INDEXING_LIMIT, -) -> list[int]: - """ - Return up to `limit` user file connector_credential_pair IDs that still - need indexing for the given `search_settings_id` - - A cc_pair is considered "needs indexing" if its most recent IndexAttempt - for this search_settings_id is either: - - Missing entirely (no attempts yet) - - Present but not SUCCESS status - - Implementation details: - - Uses a LEFT JOIN LATERAL subquery to fetch only the single newest attempt - per cc_pair (`ORDER BY time_updated DESC LIMIT 1`), instead of joining all - attempts. This avoids scanning thousands of historical attempts and - keeps memory/CPU usage low - - `ON TRUE` is required in the lateral join because the correlation to - ConnectorCredentialPair.id happens inside the subquery itself - - NOTE: Shares some redundant logic with should_index() (TODO: combine) - - Returns: - list[int]: connector_credential_pair IDs that should be indexed next - """ - latest_attempt = lateral( - select(IndexAttempt.status) - .where( - IndexAttempt.connector_credential_pair_id == ConnectorCredentialPair.id, - IndexAttempt.search_settings_id == search_settings_id, - ) - .order_by(IndexAttempt.time_updated.desc()) - .limit(1) - ).alias("latest_attempt") - - stmt = ( - select(ConnectorCredentialPair.id) - .outerjoin(latest_attempt, true()) # ON TRUE, Postgres-style lateral join - .where( - ConnectorCredentialPair.is_user_file.is_(True), - or_( - latest_attempt.c.status.is_(None), # no attempts at all - latest_attempt.c.status != IndexingStatus.SUCCESS, # latest != SUCCESS - ), - ) - .limit(limit) # Always apply a limit when fetching user file cc pairs - ) - - return list(db_session.scalars(stmt)) - - def fetch_connector_credential_pair_for_connector( db_session: Session, connector_id: int, diff --git a/backend/onyx/db/models.py b/backend/onyx/db/models.py index bb18d5a0cf5..aa01371ec3f 100644 --- a/backend/onyx/db/models.py +++ b/backend/onyx/db/models.py @@ -534,7 +534,6 @@ class ConnectorCredentialPair(Base): """ __tablename__ = "connector_credential_pair" - is_user_file: Mapped[bool] = mapped_column(Boolean, default=False) # NOTE: this `id` column has to use `Sequence` instead of `autoincrement=True` # due to some SQLAlchemy quirks + this not being a primary key column id: Mapped[int] = mapped_column( @@ -3571,9 +3570,6 @@ class UserFile(Base): back_populates="user_files", ) file_id: Mapped[str] = mapped_column(nullable=False) - document_id: Mapped[str] = mapped_column( - nullable=False - ) # TODO(subash): legacy document_id, will be removed in a future migration name: Mapped[str] = mapped_column(nullable=False) created_at: Mapped[datetime.datetime] = mapped_column( default=datetime.datetime.utcnow @@ -3601,9 +3597,6 @@ class UserFile(Base): link_url: Mapped[str | None] = mapped_column(String, nullable=True) content_type: Mapped[str | None] = mapped_column(String, nullable=True) - document_id_migrated: Mapped[bool] = mapped_column( - Boolean, nullable=False, default=True - ) projects: Mapped[list["UserProject"]] = relationship( "UserProject", diff --git a/backend/onyx/db/projects.py b/backend/onyx/db/projects.py index ac1cd3f778f..f1d0fb904c6 100644 --- a/backend/onyx/db/projects.py +++ b/backend/onyx/db/projects.py @@ -72,7 +72,6 @@ def create_user_files( id=new_id, user_id=user.id if user else None, file_id=file_path, - document_id=str(new_id), name=file.filename, token_count=categorized_files.acceptable_file_to_token_count[ file.filename or "" diff --git a/backend/onyx/db/swap_index.py b/backend/onyx/db/swap_index.py index 06149cbc96c..2034e63e01a 100644 --- a/backend/onyx/db/swap_index.py +++ b/backend/onyx/db/swap_index.py @@ -126,7 +126,7 @@ def check_and_perform_index_swap(db_session: Session) -> SearchSettings | None: did change. """ # Default CC-pair created for Ingestion API unused here - all_cc_pairs = get_connector_credential_pairs(db_session, include_user_files=True) + all_cc_pairs = get_connector_credential_pairs(db_session) cc_pair_count = max(len(all_cc_pairs) - 1, 0) new_search_settings = get_secondary_search_settings(db_session) diff --git a/backend/onyx/document_index/interfaces.py b/backend/onyx/document_index/interfaces.py index 8f5ff736431..c2cb4fe8d68 100644 --- a/backend/onyx/document_index/interfaces.py +++ b/backend/onyx/document_index/interfaces.py @@ -109,10 +109,6 @@ class VespaDocumentFields: hidden: bool | None = None aggregated_chunk_boost_factor: float | None = None - # document_id is added for migration purposes, ideally we should not be updating this field - # TODO(subash): remove this field in a future migration - document_id: str | None = None - @dataclass class VespaDocumentUserFields: diff --git a/backend/onyx/document_index/interfaces_new.py b/backend/onyx/document_index/interfaces_new.py index 75dd8cdaaf4..e99f8357472 100644 --- a/backend/onyx/document_index/interfaces_new.py +++ b/backend/onyx/document_index/interfaces_new.py @@ -282,10 +282,6 @@ class Updatable(abc.ABC): def update( self, update_requests: list[MetadataUpdateRequest], - # TODO(andrei), WARNING: Very temporary, this is not the interface we want - # in Updatable, we only have this to continue supporting - # user_file_docid_migration_task for Vespa which should be done soon. - old_doc_id_to_new_doc_id: dict[str, str], ) -> None: """ Updates some set of chunks. The document and fields to update are specified in the update diff --git a/backend/onyx/document_index/opensearch/opensearch_document_index.py b/backend/onyx/document_index/opensearch/opensearch_document_index.py index 9787fb45477..cf155d07ae2 100644 --- a/backend/onyx/document_index/opensearch/opensearch_document_index.py +++ b/backend/onyx/document_index/opensearch/opensearch_document_index.py @@ -308,7 +308,7 @@ def update_single( ), ) - return self._real_index.update([update_request], old_doc_id_to_new_doc_id={}) + return self._real_index.update([update_request]) def update( self, @@ -516,10 +516,6 @@ def delete(self, document_id: str, chunk_count: int | None = None) -> int: def update( self, update_requests: list[MetadataUpdateRequest], - # TODO(andrei), WARNING: Very temporary, this is not the interface we - # want in Updatable, we only have this to continue supporting - # user_file_docid_migration_task for Vespa which should be done soon. - old_doc_id_to_new_doc_id: dict[str, str], ) -> None: logger.info("[ANDREI]: Updating documents...") # TODO(andrei): This needs to be implemented. I explicitly do not raise diff --git a/backend/onyx/document_index/vespa/index.py b/backend/onyx/document_index/vespa/index.py index 248ec34d37d..831aa4ec887 100644 --- a/backend/onyx/document_index/vespa/index.py +++ b/backend/onyx/document_index/vespa/index.py @@ -686,12 +686,7 @@ def update_single( project_ids=project_ids, ) - old_doc_id_to_new_doc_id: dict[str, str] = dict() - if fields is not None and fields.document_id is not None: - old_doc_id_to_new_doc_id[doc_id] = fields.document_id - vespa_document_index.update( - [update_request], old_doc_id_to_new_doc_id=old_doc_id_to_new_doc_id - ) + vespa_document_index.update([update_request]) def delete_single( self, diff --git a/backend/onyx/document_index/vespa/vespa_document_index.py b/backend/onyx/document_index/vespa/vespa_document_index.py index 16238f6d1cb..60c5f77d25e 100644 --- a/backend/onyx/document_index/vespa/vespa_document_index.py +++ b/backend/onyx/document_index/vespa/vespa_document_index.py @@ -215,7 +215,6 @@ def _update_single_chunk( doc_id: str, http_client: httpx.Client, update_request: MetadataUpdateRequest, - new_doc_id: str | None, ) -> None: """Updates a single document chunk in Vespa. @@ -251,11 +250,6 @@ class _UserProjects(BaseModel): model_config = {"frozen": True} assign: list[int] - # TODO(andrei): Very temporary, delete soon. - class _DocumentId(BaseModel): - model_config = {"frozen": True} - assign: str - class _VespaPutFields(BaseModel): model_config = {"frozen": True} # The names of these fields are based the Vespa schema. Changes to the @@ -266,8 +260,6 @@ class _VespaPutFields(BaseModel): access_control_list: _AccessControl | None = None hidden: _Hidden | None = None user_project: _UserProjects | None = None - # TODO(andrei): Very temporary, delete soon. - document_id: _DocumentId | None = None class _VespaPutRequest(BaseModel): model_config = {"frozen": True} @@ -302,10 +294,6 @@ class _VespaPutRequest(BaseModel): if update_request.project_ids is not None else None ) - # TODO(andrei): Very temporary, delete soon. - document_id_update: _DocumentId | None = ( - _DocumentId(assign=new_doc_id) if new_doc_id is not None else None - ) vespa_put_fields = _VespaPutFields( boost=boost_update, @@ -313,8 +301,6 @@ class _VespaPutRequest(BaseModel): access_control_list=access_update, hidden=hidden_update, user_project=user_projects_update, - # TODO(andrei): Very temporary, delete soon. - document_id=document_id_update, ) vespa_put_request = _VespaPutRequest( @@ -540,10 +526,6 @@ def delete(self, document_id: str, chunk_count: int | None = None) -> int: def update( self, update_requests: list[MetadataUpdateRequest], - # TODO(andrei), WARNING: Very temporary, this is not the interface we want - # in Updatable, we only have this to continue supporting - # user_file_docid_migration_task for Vespa which should be done soon. - old_doc_id_to_new_doc_id: dict[str, str], ) -> None: # WARNING: This method can be called by vespa_metadata_sync_task, which # is kicked off by check_for_vespa_sync_task, notably before a document @@ -584,8 +566,6 @@ def update( doc_id, httpx_client, update_request, - # NOTE: The key is the raw ID, not the sanitized ID. - new_doc_id=old_doc_id_to_new_doc_id.get(doc_id, None), ) logger.info( diff --git a/backend/onyx/server/documents/connector.py b/backend/onyx/server/documents/connector.py index d7b36c02f68..de6f8a599d1 100644 --- a/backend/onyx/server/documents/connector.py +++ b/backend/onyx/server/documents/connector.py @@ -1791,7 +1791,6 @@ def trigger_indexing_for_cc_pair( from_beginning: bool, tenant_id: str, db_session: Session, - is_user_file: bool = False, ) -> int: try: possible_credential_ids = get_connector_credential_ids(connector_id, db_session) @@ -1855,8 +1854,9 @@ def trigger_indexing_for_cc_pair( f"indexing_trigger={indexing_mode}" ) + priority = OnyxCeleryPriority.HIGH + # run the beat task to pick up the triggers immediately - priority = OnyxCeleryPriority.HIGHEST if is_user_file else OnyxCeleryPriority.HIGH logger.info(f"Sending indexing check task with priority {priority}") client_app.send_task( OnyxCeleryTask.CHECK_FOR_INDEXING, diff --git a/backend/onyx/server/documents/models.py b/backend/onyx/server/documents/models.py index 6bbdf5a1324..cfbe8194fdf 100644 --- a/backend/onyx/server/documents/models.py +++ b/backend/onyx/server/documents/models.py @@ -133,7 +133,6 @@ class CredentialBase(BaseModel): name: str | None = None curator_public: bool = False groups: list[int] = Field(default_factory=list) - is_user_file: bool = False class CredentialSnapshot(CredentialBase): diff --git a/backend/scripts/dev_run_background_jobs.py b/backend/scripts/dev_run_background_jobs.py index f30fdd413de..65ebd3517e0 100644 --- a/backend/scripts/dev_run_background_jobs.py +++ b/backend/scripts/dev_run_background_jobs.py @@ -71,7 +71,7 @@ def run_jobs() -> None: "--prefetch-multiplier=1", "--loglevel=INFO", "--hostname=docfetching@%n", - "--queues=connector_doc_fetching,user_files_indexing", + "--queues=connector_doc_fetching", ] cmd_beat = [ diff --git a/backend/scripts/tenant_cleanup/on_pod_scripts/get_tenant_connectors.py b/backend/scripts/tenant_cleanup/on_pod_scripts/get_tenant_connectors.py index 34c9a85895d..c0e69688388 100644 --- a/backend/scripts/tenant_cleanup/on_pod_scripts/get_tenant_connectors.py +++ b/backend/scripts/tenant_cleanup/on_pod_scripts/get_tenant_connectors.py @@ -50,10 +50,8 @@ def get_tenant_connectors(tenant_id: str) -> dict: ) with get_session_with_tenant(tenant_id=tenant_id) as db_session: - # Get all connector credential pairs, excluding user files - stmt = select(ConnectorCredentialPair).where( - ConnectorCredentialPair.is_user_file.is_(False) - ) + # Get all connector credential pairs + stmt = select(ConnectorCredentialPair) cc_pairs = db_session.execute(stmt).scalars().all() connectors = [ diff --git a/backend/supervisord.conf b/backend/supervisord.conf index 9aca6efaa20..8c264f9294a 100644 --- a/backend/supervisord.conf +++ b/backend/supervisord.conf @@ -62,7 +62,7 @@ autostart=%(ENV_USE_SEPARATE_BACKGROUND_WORKERS)s command=celery -A onyx.background.celery.versioned_apps.background worker --loglevel=INFO --hostname=background@%%n - -Q vespa_metadata_sync,connector_deletion,doc_permissions_upsert,checkpoint_cleanup,index_attempt_cleanup,docprocessing,connector_doc_fetching,user_files_indexing,connector_pruning,connector_doc_permissions_sync,connector_external_group_sync,csv_generation,kg_processing,monitoring,user_file_processing,user_file_project_sync + -Q vespa_metadata_sync,connector_deletion,doc_permissions_upsert,checkpoint_cleanup,index_attempt_cleanup,docprocessing,connector_doc_fetching,connector_pruning,connector_doc_permissions_sync,connector_external_group_sync,csv_generation,kg_processing,monitoring,user_file_processing,user_file_project_sync stdout_logfile=/var/log/celery_worker_background.log stdout_logfile_maxbytes=16MB redirect_stderr=true @@ -113,20 +113,6 @@ startsecs=10 stopasgroup=true autostart=%(ENV_USE_SEPARATE_BACKGROUND_WORKERS)s -# Standard mode: User files indexing worker -[program:celery_worker_user_files_indexing] -command=celery -A onyx.background.celery.versioned_apps.docfetching worker - --loglevel=INFO - --hostname=user_files_indexing@%%n - -Q user_files_indexing -stdout_logfile=/var/log/celery_worker_user_files_indexing.log -stdout_logfile_maxbytes=16MB -redirect_stderr=true -autorestart=true -startsecs=10 -stopasgroup=true -autostart=%(ENV_USE_SEPARATE_BACKGROUND_WORKERS)s - # Standard mode: Document fetching worker [program:celery_worker_docfetching] command=celery -A onyx.background.celery.versioned_apps.docfetching worker @@ -218,7 +204,6 @@ command=tail -qF /var/log/celery_worker_kg_processing.log /var/log/celery_worker_monitoring.log /var/log/celery_worker_user_file_processing.log - /var/log/celery_worker_user_files_indexing.log /var/log/celery_worker_docfetching.log /var/log/slack_bot.log /var/log/supervisord_watchdog_celery_beat.log diff --git a/backend/tests/external_dependency_unit/celery/test_docfetching_priority.py b/backend/tests/external_dependency_unit/celery/test_docfetching_priority.py index 0fbf365ed52..731c1028b0f 100644 --- a/backend/tests/external_dependency_unit/celery/test_docfetching_priority.py +++ b/backend/tests/external_dependency_unit/celery/test_docfetching_priority.py @@ -382,61 +382,3 @@ def test_lock_released_after_successful_task_creation( # Both calls should have submitted tasks mock_celery_app.send_task.assert_called_once() - - @patch( - "onyx.background.celery.tasks.docfetching.task_creation_utils.IndexingCoordination.try_create_index_attempt" - ) - def test_user_file_connector_uses_correct_queue( - self, - mock_try_create_index_attempt: MagicMock, - db_session: Session, - ) -> None: - """ - Test that user file connectors use the USER_FILES_INDEXING queue. - """ - from onyx.configs.constants import OnyxCeleryQueues - - unique_suffix = uuid4().hex[:8] - - connector = _create_test_connector( - db_session, f"test_connector_userfile_{unique_suffix}" - ) - credential = _create_test_credential(db_session) - cc_pair = _create_test_cc_pair( - db_session, - connector, - credential, - ConnectorCredentialPairStatus.INITIAL_INDEXING, - name=f"test_cc_pair_userfile_{unique_suffix}", - ) - # Mark as user file - cc_pair.is_user_file = True - db_session.commit() - db_session.refresh(cc_pair) - - search_settings = _create_test_search_settings( - db_session, f"test_index_userfile_{unique_suffix}" - ) - - mock_try_create_index_attempt.return_value = 12345 - mock_celery_app = MagicMock() - mock_celery_app.send_task.return_value = MagicMock() - - redis_client = get_redis_client(tenant_id=TEST_TENANT_ID) - - result = try_creating_docfetching_task( - celery_app=mock_celery_app, - cc_pair=cc_pair, - search_settings=search_settings, - reindex=False, - db_session=db_session, - r=redis_client, - tenant_id=TEST_TENANT_ID, - ) - - assert result == 12345 - mock_celery_app.send_task.assert_called_once() - call_kwargs = mock_celery_app.send_task.call_args - assert call_kwargs.kwargs["queue"] == OnyxCeleryQueues.USER_FILES_INDEXING - # User files with no last_successful_index_time should get HIGH priority - assert call_kwargs.kwargs["priority"] == OnyxCeleryPriority.HIGH diff --git a/deployment/helm/charts/onyx/Chart.yaml b/deployment/helm/charts/onyx/Chart.yaml index c7e73942d66..98a44b742d9 100644 --- a/deployment/helm/charts/onyx/Chart.yaml +++ b/deployment/helm/charts/onyx/Chart.yaml @@ -5,7 +5,7 @@ home: https://www.onyx.app/ sources: - "https://github.com/onyx-dot-app/onyx" type: application -version: 0.4.17 +version: 0.4.18 appVersion: latest annotations: category: Productivity diff --git a/deployment/helm/charts/onyx/templates/celery-worker-docfetching.yaml b/deployment/helm/charts/onyx/templates/celery-worker-docfetching.yaml index a8226852e59..ccf71320b8a 100644 --- a/deployment/helm/charts/onyx/templates/celery-worker-docfetching.yaml +++ b/deployment/helm/charts/onyx/templates/celery-worker-docfetching.yaml @@ -71,7 +71,7 @@ spec: "--loglevel=INFO", "--hostname=docfetching@%n", "-Q", - "connector_doc_fetching,user_files_indexing", + "connector_doc_fetching", ] resources: {{- toYaml .Values.celery_worker_docfetching.resources | nindent 12 }} diff --git a/deployment/helm/charts/onyx/templates/celery-worker-user-files-indexing-hpa.yaml b/deployment/helm/charts/onyx/templates/celery-worker-user-files-indexing-hpa.yaml deleted file mode 100644 index 8dbf87e1af5..00000000000 --- a/deployment/helm/charts/onyx/templates/celery-worker-user-files-indexing-hpa.yaml +++ /dev/null @@ -1,32 +0,0 @@ -{{- if and (.Values.celery_worker_user_files_indexing.autoscaling.enabled) (ne (include "onyx.autoscaling.engine" .) "keda") }} -apiVersion: autoscaling/v2 -kind: HorizontalPodAutoscaler -metadata: - name: {{ include "onyx.fullname" . }}-celery-worker-user-files-indexing - labels: - {{- include "onyx.labels" . | nindent 4 }} -spec: - scaleTargetRef: - apiVersion: apps/v1 - kind: Deployment - name: {{ include "onyx.fullname" . }}-celery-worker-user-files-indexing - minReplicas: {{ .Values.celery_worker_user_files_indexing.autoscaling.minReplicas }} - maxReplicas: {{ .Values.celery_worker_user_files_indexing.autoscaling.maxReplicas }} - metrics: - {{- if .Values.celery_worker_user_files_indexing.autoscaling.targetCPUUtilizationPercentage }} - - type: Resource - resource: - name: cpu - target: - type: Utilization - averageUtilization: {{ .Values.celery_worker_user_files_indexing.autoscaling.targetCPUUtilizationPercentage }} - {{- end }} - {{- if .Values.celery_worker_user_files_indexing.autoscaling.targetMemoryUtilizationPercentage }} - - type: Resource - resource: - name: memory - target: - type: Utilization - averageUtilization: {{ .Values.celery_worker_user_files_indexing.autoscaling.targetMemoryUtilizationPercentage }} - {{- end }} -{{- end }} diff --git a/deployment/helm/charts/onyx/templates/celery-worker-user-files-indexing-scaledobject.yaml b/deployment/helm/charts/onyx/templates/celery-worker-user-files-indexing-scaledobject.yaml deleted file mode 100644 index 3f3f78a6757..00000000000 --- a/deployment/helm/charts/onyx/templates/celery-worker-user-files-indexing-scaledobject.yaml +++ /dev/null @@ -1,37 +0,0 @@ -{{- if and (.Values.celery_worker_user_files_indexing.autoscaling.enabled) (eq (include "onyx.autoscaling.engine" .) "keda") }} -apiVersion: keda.sh/v1alpha1 -kind: ScaledObject -metadata: - name: {{ include "onyx.fullname" . }}-celery-worker-user-files-indexing - labels: - {{- include "onyx.labels" . | nindent 4 }} -spec: - scaleTargetRef: - apiVersion: apps/v1 - kind: Deployment - name: {{ include "onyx.fullname" . }}-celery-worker-user-files-indexing - minReplicaCount: {{ .Values.celery_worker_user_files_indexing.autoscaling.minReplicas | default 1 }} - maxReplicaCount: {{ .Values.celery_worker_user_files_indexing.autoscaling.maxReplicas | default 10 }} - pollingInterval: {{ .Values.celery_worker_user_files_indexing.autoscaling.pollingInterval | default 30 }} - cooldownPeriod: {{ .Values.celery_worker_user_files_indexing.autoscaling.cooldownPeriod | default 300 }} - idleReplicaCount: {{ .Values.celery_worker_user_files_indexing.autoscaling.idleReplicaCount | default 1 }} - fallback: - failureThreshold: {{ .Values.celery_worker_user_files_indexing.autoscaling.failureThreshold | default 3 }} - replicas: {{ .Values.celery_worker_user_files_indexing.autoscaling.fallbackReplicas | default 1 }} - triggers: - {{- if .Values.celery_worker_user_files_indexing.autoscaling.targetCPUUtilizationPercentage }} - - type: cpu - metricType: Utilization - metadata: - value: "{{ .Values.celery_worker_user_files_indexing.autoscaling.targetCPUUtilizationPercentage }}" - {{- end }} - {{- if .Values.celery_worker_user_files_indexing.autoscaling.targetMemoryUtilizationPercentage }} - - type: memory - metricType: Utilization - metadata: - value: "{{ .Values.celery_worker_user_files_indexing.autoscaling.targetMemoryUtilizationPercentage }}" - {{- end }} - {{- if .Values.celery_worker_user_files_indexing.autoscaling.customTriggers }} - {{- toYaml .Values.celery_worker_user_files_indexing.autoscaling.customTriggers | nindent 4 }} - {{- end }} -{{- end }} diff --git a/deployment/helm/charts/onyx/templates/celery-worker-user-files-indexing.yaml b/deployment/helm/charts/onyx/templates/celery-worker-user-files-indexing.yaml deleted file mode 100644 index 5b82f7782b2..00000000000 --- a/deployment/helm/charts/onyx/templates/celery-worker-user-files-indexing.yaml +++ /dev/null @@ -1,110 +0,0 @@ -{{- if gt (int .Values.celery_worker_user_files_indexing.replicaCount) 0 }} -apiVersion: apps/v1 -kind: Deployment -metadata: - name: {{ include "onyx.fullname" . }}-celery-worker-user-files-indexing - labels: - {{- include "onyx.labels" . | nindent 4 }} - {{- with .Values.celery_worker_user_files_indexing.deploymentLabels }} - {{- toYaml . | nindent 4 }} - {{- end }} -spec: - {{- if not .Values.celery_worker_user_files_indexing.autoscaling.enabled }} - replicas: {{ .Values.celery_worker_user_files_indexing.replicaCount }} - {{- end }} - selector: - matchLabels: - {{- include "onyx.selectorLabels" . | nindent 6 }} - {{- if .Values.celery_worker_user_files_indexing.deploymentLabels }} - {{- toYaml .Values.celery_worker_user_files_indexing.deploymentLabels | nindent 6 }} - {{- end }} - template: - metadata: - annotations: - checksum/config: {{ include (print $.Template.BasePath "/configmap.yaml") . | sha256sum }} - {{- with .Values.celery_worker_user_files_indexing.podAnnotations }} - {{- toYaml . | nindent 8 }} - {{- end }} - labels: - {{- include "onyx.labels" . | nindent 8 }} - {{- with .Values.celery_worker_user_files_indexing.deploymentLabels }} - {{- toYaml . | nindent 8 }} - {{- end }} - {{- with .Values.celery_worker_user_files_indexing.podLabels }} - {{- toYaml . | nindent 8 }} - {{- end }} - spec: - {{- with .Values.imagePullSecrets }} - imagePullSecrets: - {{- toYaml . | nindent 8 }} - {{- end }} - serviceAccountName: {{ include "onyx.serviceAccountName" . }} - securityContext: - {{- toYaml .Values.celery_shared.podSecurityContext | nindent 8 }} - {{- with .Values.celery_worker_user_files_indexing.nodeSelector }} - nodeSelector: - {{- toYaml . | nindent 8 }} - {{- end }} - {{- with .Values.celery_worker_user_files_indexing.affinity }} - affinity: - {{- toYaml . | nindent 8 }} - {{- end }} - {{- with .Values.celery_worker_user_files_indexing.tolerations }} - tolerations: - {{- toYaml . | nindent 8 }} - {{- end }} - containers: - - name: celery-worker-user-files-indexing - securityContext: - {{- toYaml .Values.celery_shared.securityContext | nindent 12 }} - image: "{{ .Values.celery_shared.image.repository }}:{{ .Values.celery_shared.image.tag | default .Values.global.version }}" - imagePullPolicy: {{ .Values.global.pullPolicy }} - command: - [ - "celery", - "-A", - "onyx.background.celery.versioned_apps.docfetching", - "worker", - "--loglevel=INFO", - "--hostname=user-files-indexing@%n", - "-Q", - "user_files_indexing", - ] - resources: - {{- toYaml .Values.celery_worker_user_files_indexing.resources | nindent 12 }} - envFrom: - - configMapRef: - name: {{ .Values.config.envConfigMapName }} - env: - {{- include "onyx.envSecrets" . | nindent 12}} - {{- with .Values.celery_worker_user_files_indexing.volumeMounts }} - volumeMounts: - {{- toYaml . | nindent 12 }} - {{- end }} - startupProbe: - {{ .Values.celery_shared.startupProbe | toYaml | nindent 12}} - readinessProbe: - {{ .Values.celery_shared.readinessProbe | toYaml | nindent 12}} - exec: - command: - - /bin/bash - - -c - - > - python onyx/background/celery/celery_k8s_probe.py - --probe readiness - --filename /tmp/onyx_k8s_userfilesindexing_readiness.txt - livenessProbe: - {{ .Values.celery_shared.livenessProbe | toYaml | nindent 12}} - exec: - command: - - /bin/bash - - -c - - > - python onyx/background/celery/celery_k8s_probe.py - --probe liveness - --filename /tmp/onyx_k8s_userfilesindexing_liveness.txt - {{- with .Values.celery_worker_user_files_indexing.volumes }} - volumes: - {{- toYaml . | nindent 8 }} - {{- end }} -{{- end }} diff --git a/deployment/helm/charts/onyx/values.yaml b/deployment/helm/charts/onyx/values.yaml index 64f3970cf4a..33870befaaf 100644 --- a/deployment/helm/charts/onyx/values.yaml +++ b/deployment/helm/charts/onyx/values.yaml @@ -620,41 +620,6 @@ celery_worker_primary: tolerations: [] affinity: {} -celery_worker_user_files_indexing: - replicaCount: 1 - autoscaling: - enabled: false - minReplicas: 1 - maxReplicas: 10 - targetCPUUtilizationPercentage: 80 - targetMemoryUtilizationPercentage: 80 - # KEDA specific configurations (only used when autoscaling.engine is set to 'keda') - pollingInterval: 30 # seconds - cooldownPeriod: 300 # seconds - idleReplicaCount: 1 # minimum replicas when idle - failureThreshold: 3 # number of failures before fallback - fallbackReplicas: 1 # replicas to maintain on failure - # Custom triggers for advanced KEDA configurations - customTriggers: [] - podAnnotations: {} - podLabels: - scope: onyx-backend-celery - app: celery-worker-user-files-indexing - deploymentLabels: - app: celery-worker-user-files-indexing - resources: - requests: - cpu: 500m - memory: 512Mi - limits: - cpu: 2000m - memory: 2Gi - volumes: [] # Additional volumes on the output Deployment definition. - volumeMounts: [] # Additional volumeMounts on the output Deployment definition. - nodeSelector: {} - tolerations: [] - affinity: {} - celery_worker_user_file_processing: replicaCount: 1 autoscaling: