8686from onyx .db .search_settings import get_current_search_settings
8787from onyx .db .search_settings import get_secondary_search_settings
8888from onyx .db .swap_index import check_and_perform_index_swap
89+ from onyx .db .usage import UsageLimitExceededError
8990from onyx .document_index .factory import get_default_document_index
9091from onyx .file_store .document_batch_storage import DocumentBatchStorage
9192from onyx .file_store .document_batch_storage import get_document_batch_storage
112113from shared_configs .configs import INDEXING_MODEL_SERVER_HOST
113114from shared_configs .configs import INDEXING_MODEL_SERVER_PORT
114115from shared_configs .configs import MULTI_TENANT
116+ from shared_configs .configs import USAGE_LIMITS_ENABLED
115117from shared_configs .contextvars import CURRENT_TENANT_ID_CONTEXTVAR
116118from shared_configs .contextvars import INDEX_ATTEMPT_INFO_CONTEXTVAR
117119
@@ -1279,6 +1281,31 @@ def docprocessing_task(
12791281 INDEX_ATTEMPT_INFO_CONTEXTVAR .reset (token )
12801282
12811283
1284+ def _check_chunk_usage_limit (tenant_id : str ) -> None :
1285+ """Check if chunk indexing usage limit has been exceeded.
1286+
1287+ Raises UsageLimitExceededError if the limit is exceeded.
1288+ """
1289+ if not USAGE_LIMITS_ENABLED :
1290+ return
1291+
1292+ from onyx .db .usage import check_usage_limit
1293+ from onyx .db .usage import UsageType
1294+ from onyx .server .usage_limits import get_limit_for_usage_type
1295+ from onyx .server .usage_limits import is_tenant_on_trial
1296+
1297+ is_trial = is_tenant_on_trial (tenant_id )
1298+ limit = get_limit_for_usage_type (UsageType .CHUNKS_INDEXED , is_trial )
1299+
1300+ with get_session_with_current_tenant () as db_session :
1301+ check_usage_limit (
1302+ db_session = db_session ,
1303+ usage_type = UsageType .CHUNKS_INDEXED ,
1304+ limit = limit ,
1305+ pending_amount = 0 , # Just check current usage
1306+ )
1307+
1308+
12821309def _docprocessing_task (
12831310 index_attempt_id : int ,
12841311 cc_pair_id : int ,
@@ -1290,6 +1317,25 @@ def _docprocessing_task(
12901317 if tenant_id :
12911318 CURRENT_TENANT_ID_CONTEXTVAR .set (tenant_id )
12921319
1320+ # Check if chunk indexing usage limit has been exceeded before processing
1321+ if USAGE_LIMITS_ENABLED :
1322+ try :
1323+ _check_chunk_usage_limit (tenant_id )
1324+ except UsageLimitExceededError as e :
1325+ # Log the error and fail the indexing attempt
1326+ task_logger .error (
1327+ f"Chunk indexing usage limit exceeded for tenant { tenant_id } : { e } "
1328+ )
1329+ with get_session_with_current_tenant () as db_session :
1330+ from onyx .db .index_attempt import mark_attempt_failed
1331+
1332+ mark_attempt_failed (
1333+ index_attempt_id = index_attempt_id ,
1334+ db_session = db_session ,
1335+ failure_reason = str (e ),
1336+ )
1337+ raise
1338+
12931339 task_logger .info (
12941340 f"Processing document batch: "
12951341 f"attempt={ index_attempt_id } "
@@ -1434,6 +1480,23 @@ def _docprocessing_task(
14341480 adapter = adapter ,
14351481 )
14361482
1483+ # Track chunk indexing usage for cloud usage limits
1484+ if USAGE_LIMITS_ENABLED and index_pipeline_result .total_chunks > 0 :
1485+ try :
1486+ from onyx .db .usage import increment_usage
1487+ from onyx .db .usage import UsageType
1488+
1489+ with get_session_with_current_tenant () as usage_db_session :
1490+ increment_usage (
1491+ db_session = usage_db_session ,
1492+ usage_type = UsageType .CHUNKS_INDEXED ,
1493+ amount = index_pipeline_result .total_chunks ,
1494+ )
1495+ usage_db_session .commit ()
1496+ except Exception as e :
1497+ # Log but don't fail indexing if usage tracking fails
1498+ task_logger .warning (f"Failed to track chunk indexing usage: { e } " )
1499+
14371500 # Update batch completion and document counts atomically using database coordination
14381501
14391502 with get_session_with_current_tenant () as db_session , cross_batch_db_lock :
0 commit comments