Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""remove userfile related deprecated fields

Revision ID: a3c1a7904cd0
Revises: 5c3dca366b35
Create Date: 2026-01-06 13:00:30.634396

"""

from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = "a3c1a7904cd0"
down_revision = "5c3dca366b35"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.drop_column("user_file", "document_id")
op.drop_column("user_file", "document_id_migrated")
op.drop_column("connector_credential_pair", "is_user_file")


def downgrade() -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

want someone who was around when this was implemented and has context to sanity check this, once this migration runs on prod we're done this data is gone

op.add_column(
"connector_credential_pair",
sa.Column("is_user_file", sa.Boolean(), nullable=False, server_default="false"),
)
op.add_column(
"user_file",
sa.Column("document_id", sa.String(), nullable=True),
)
op.add_column(
"user_file",
sa.Column(
"document_id_migrated", sa.Boolean(), nullable=False, server_default="true"
),
)
3 changes: 0 additions & 3 deletions backend/onyx/background/celery/apps/docfetching.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,5 @@ def on_setup_logging(
celery_app.autodiscover_tasks(
[
"onyx.background.celery.tasks.docfetching",
# Ensure the user files indexing worker registers the doc_id migration task
# TODO(subash): remove this once the doc_id migration is complete
"onyx.background.celery.tasks.user_file_processing",
]
)
10 changes: 0 additions & 10 deletions backend/onyx/background/celery/tasks/beat_schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,6 @@
"expires": BEAT_EXPIRES_DEFAULT,
},
},
{
"name": "user-file-docid-migration",
"task": OnyxCeleryTask.USER_FILE_DOCID_MIGRATION,
"schedule": timedelta(minutes=10),
"options": {
"priority": OnyxCeleryPriority.HIGH,
"expires": BEAT_EXPIRES_DEFAULT,
"queue": OnyxCeleryQueues.USER_FILES_INDEXING,
},
},
{
"name": "check-for-kg-processing",
"task": OnyxCeleryTask.CHECK_KG_PROCESSING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,6 @@ def try_creating_docfetching_task(
# Another indexing attempt is already running
return None

# Determine which queue to use based on whether this is a user file
# TODO: at the moment the indexing pipeline is
# shared between user files and connectors
queue = (
OnyxCeleryQueues.USER_FILES_INDEXING
if cc_pair.is_user_file
else OnyxCeleryQueues.CONNECTOR_DOC_FETCHING
)

# Use higher priority for first-time indexing to ensure new connectors
# get processed before re-indexing of existing connectors
has_successful_attempt = cc_pair.last_successful_index_time is not None
Expand All @@ -99,7 +90,7 @@ def try_creating_docfetching_task(
search_settings_id=search_settings.id,
tenant_id=tenant_id,
),
queue=queue,
queue=OnyxCeleryQueues.CONNECTOR_DOC_FETCHING,
task_id=custom_task_id,
priority=priority,
)
Expand Down
25 changes: 3 additions & 22 deletions backend/onyx/background/celery/tasks/docprocessing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@
from onyx.db.connector_credential_pair import (
fetch_indexable_standard_connector_credential_pair_ids,
)
from onyx.db.connector_credential_pair import (
fetch_indexable_user_file_connector_credential_pair_ids,
)
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
from onyx.db.connector_credential_pair import set_cc_pair_repeated_error_state
from onyx.db.engine.sql_engine import get_session_with_current_tenant
Expand Down Expand Up @@ -540,12 +537,7 @@ def check_indexing_completion(
]:
# User file connectors must be paused on success
# NOTE: _run_indexing doesn't update connectors if the index attempt is the future embedding model
# TODO: figure out why this doesn't pause connectors during swap
cc_pair.status = (
ConnectorCredentialPairStatus.PAUSED
if cc_pair.is_user_file
else ConnectorCredentialPairStatus.ACTIVE
)
cc_pair.status = ConnectorCredentialPairStatus.ACTIVE
db_session.commit()

mt_cloud_telemetry(
Expand Down Expand Up @@ -811,13 +803,8 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
db_session, active_cc_pairs_only=True
)
)
user_file_cc_pair_ids = (
fetch_indexable_user_file_connector_credential_pair_ids(
db_session, search_settings_id=current_search_settings.id
)
)

primary_cc_pair_ids = standard_cc_pair_ids + user_file_cc_pair_ids
primary_cc_pair_ids = standard_cc_pair_ids

# Get CC pairs for secondary search settings
secondary_cc_pair_ids: list[int] = []
Expand All @@ -833,14 +820,8 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
db_session, active_cc_pairs_only=not include_paused
)
)
user_file_cc_pair_ids = (
fetch_indexable_user_file_connector_credential_pair_ids(
db_session, search_settings_id=secondary_search_settings.id
)
or []
)

secondary_cc_pair_ids = standard_cc_pair_ids + user_file_cc_pair_ids
secondary_cc_pair_ids = standard_cc_pair_ids

# Flag CC pairs in repeated error state for primary/current search settings
with get_session_with_current_tenant() as db_session:
Expand Down
5 changes: 1 addition & 4 deletions backend/onyx/background/celery/tasks/monitoring/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -886,9 +886,7 @@ def monitor_celery_queues_helper(
OnyxCeleryQueues.CONNECTOR_DOC_FETCHING, r_celery
)
n_docprocessing = celery_get_queue_length(OnyxCeleryQueues.DOCPROCESSING, r_celery)
n_user_files_indexing = celery_get_queue_length(
OnyxCeleryQueues.USER_FILES_INDEXING, r_celery
)

n_user_file_processing = celery_get_queue_length(
OnyxCeleryQueues.USER_FILE_PROCESSING, r_celery
)
Expand Down Expand Up @@ -924,7 +922,6 @@ def monitor_celery_queues_helper(
f"docfetching_prefetched={len(n_docfetching_prefetched)} "
f"docprocessing={n_docprocessing} "
f"docprocessing_prefetched={len(n_docprocessing_prefetched)} "
f"user_files_indexing={n_user_files_indexing} "
f"user_file_processing={n_user_file_processing} "
f"user_file_project_sync={n_user_file_project_sync} "
f"user_file_delete={n_user_file_delete} "
Expand Down
Loading
Loading