diff --git a/api/src/backend/queries/evaluations.py b/api/src/backend/queries/evaluations.py index 0d740df8..69b189f5 100644 --- a/api/src/backend/queries/evaluations.py +++ b/api/src/backend/queries/evaluations.py @@ -440,13 +440,3 @@ async def get_miner_hotkey_from_version_id(conn: asyncpg.Connection, version_id: FROM miner_agents WHERE version_id = $1 """, version_id) - -@db_operation -async def is_screener_running_evaluation(conn: asyncpg.Connection, validator_hotkey: str) -> bool: - """Check if a screener is running an evaluation""" - return await conn.fetchval( - """ - SELECT EXISTS(SELECT 1 FROM evaluations WHERE validator_hotkey = $1 AND status = 'running') - """, - validator_hotkey - ) \ No newline at end of file diff --git a/api/src/backend/queries/evaluations.pyi b/api/src/backend/queries/evaluations.pyi index 97d2075d..30d42c99 100644 --- a/api/src/backend/queries/evaluations.pyi +++ b/api/src/backend/queries/evaluations.pyi @@ -11,5 +11,4 @@ async def get_running_evaluation_by_miner_hotkey(miner_hotkey: str) -> Optional[ async def does_validator_have_running_evaluation(validator_hotkey: str) -> bool: ... async def get_queue_info(validator_hotkey: str, length: int = 10) -> List[Evaluation]: ... async def get_agent_name_from_version_id(version_id: str) -> Optional[str]: ... -async def get_miner_hotkey_from_version_id(version_id: str) -> Optional[str]: ... -async def is_screener_running_evaluation(validator_hotkey: str) -> bool: ... \ No newline at end of file +async def get_miner_hotkey_from_version_id(version_id: str) -> Optional[str]: ... \ No newline at end of file diff --git a/api/src/endpoints/upload.py b/api/src/endpoints/upload.py index 417f8ddb..027fa50a 100644 --- a/api/src/endpoints/upload.py +++ b/api/src/endpoints/upload.py @@ -111,18 +111,6 @@ async def post_agent( status_code=503, detail="No stage 1 screeners available for agent evaluation. Please try again later." ) - - # Check is there is an evaluation in the database with the screener's hotkey that has running status - # This is to prevent the case where a screener.is_available() returns true but the screener is actually running an evaluation - # from api.src.backend.queries.evaluations import is_screener_running_evaluation - # is_screener_running_evaluation = await is_screener_running_evaluation(screener.hotkey) - # if is_screener_running_evaluation: - # logger.error(f"No available stage 1 screener for agent upload from miner {miner_hotkey} - screener {screener.hotkey} said it was available but there is an evaluation in the database with the screener's hotkey that has running status") - # raise HTTPException( - # status_code=409, - # detail="No stage 1 screeners available for agent evaluation. Please try again later." - # ) - async with get_transaction() as conn: can_upload = await Evaluation.check_miner_has_no_running_evaluations(conn, miner_hotkey) if not can_upload: diff --git a/api/src/models/evaluation.py b/api/src/models/evaluation.py index f8e39753..7dd2e230 100644 --- a/api/src/models/evaluation.py +++ b/api/src/models/evaluation.py @@ -105,6 +105,20 @@ async def start(self, conn: asyncpg.Connection) -> List[EvaluationRun]: async def finish(self, conn: asyncpg.Connection): """Finish evaluation, but retry if >=50% of inferences failed and any run errored""" + + # Check if evaluation is already completed - prevent duplicate finish calls + current_status = await conn.fetchval( + "SELECT status FROM evaluations WHERE evaluation_id = $1", + self.evaluation_id + ) + + if current_status == 'completed': + logger.warning( + f"finish() called on already completed evaluation {self.evaluation_id} " + f"(version: {self.version_id}, validator: {self.validator_hotkey}). " + f"Skipping to prevent agent status overwrites." + ) + return None # DEBUG: Check if this evaluation already has a terminated_reason set current_terminated_reason = await conn.fetchval( @@ -484,19 +498,6 @@ async def create_screening_and_send(conn: asyncpg.Connection, agent: 'MinerAgent """Create screening evaluation""" from api.src.socket.websocket_manager import WebSocketManager - # Safety check: Ensure screener doesn't already have a running evaluation - existing_evaluation = await conn.fetchrow( - """ - SELECT evaluation_id, status FROM evaluations - WHERE validator_hotkey = $1 AND status = 'running' - LIMIT 1 - """, - screener.hotkey - ) - - if existing_evaluation: - logger.error(f"CRITICAL: Screener {screener.hotkey} already has running evaluation {existing_evaluation['evaluation_id']} - refusing to create duplicate screening") - return "", False ws = WebSocketManager.get_instance() @@ -575,7 +576,7 @@ async def screen_next_awaiting_agent(screener: "Screener"): # Log the agents for debugging awaiting_agents = await conn.fetch( """ - SELECT version_id, miner_hotkey, agent_name, created_at FROM miner_agents + SELECT version_id, miner_hotkey, agent_name, created_at, version_num FROM miner_agents WHERE status = $1 AND miner_hotkey NOT IN (SELECT miner_hotkey from banned_hotkeys) ORDER BY created_at ASC @@ -585,45 +586,45 @@ async def screen_next_awaiting_agent(screener: "Screener"): for agent in awaiting_agents[:3]: # Log first 3 logger.info(f"Awaiting stage {screener.stage} agent: {agent['agent_name']} ({agent['version_id']}) from {agent['miner_hotkey']}") - # Atomically claim the next awaiting agent for this stage using CTE with FOR UPDATE SKIP LOCKED - logger.debug(f"Stage {screener.stage} screener {screener.hotkey} attempting to claim agent with status '{target_status}'") - try: - claimed_agent = await conn.fetchrow( - """ - WITH next_agent AS ( - SELECT version_id FROM miner_agents - WHERE status = $1 - AND miner_hotkey NOT IN (SELECT miner_hotkey from banned_hotkeys) - ORDER BY created_at ASC - FOR UPDATE SKIP LOCKED - LIMIT 1 - ) - UPDATE miner_agents - SET status = $2 - FROM next_agent - WHERE miner_agents.version_id = next_agent.version_id - RETURNING miner_agents.version_id, miner_hotkey, agent_name, version_num, created_at - """, - target_status, - target_screening_status - ) - except Exception as e: - logger.warning(f"Database error while claiming agent for screener {screener.hotkey}: {e}") - claimed_agent = None + # # Atomically claim the next awaiting agent for this stage using CTE with FOR UPDATE SKIP LOCKED + # logger.debug(f"Stage {screener.stage} screener {screener.hotkey} attempting to claim agent with status '{target_status}'") + # try: + # claimed_agent = await conn.fetchrow( + # """ + # WITH next_agent AS ( + # SELECT version_id FROM miner_agents + # WHERE status = $1 + # AND miner_hotkey NOT IN (SELECT miner_hotkey from banned_hotkeys) + # ORDER BY created_at ASC + # FOR UPDATE SKIP LOCKED + # LIMIT 1 + # ) + # UPDATE miner_agents + # SET status = $2 + # FROM next_agent + # WHERE miner_agents.version_id = next_agent.version_id + # RETURNING miner_agents.version_id, miner_hotkey, agent_name, version_num, created_at + # """, + # target_status, + # target_screening_status + # ) + # except Exception as e: + # logger.warning(f"Database error while claiming agent for screener {screener.hotkey}: {e}") + # claimed_agent = None - if not claimed_agent: + else: screener.set_available() # Ensure available state is set logger.info(f"No stage {screener.stage} agents claimed by screener {screener.hotkey} despite {awaiting_count} awaiting") return - logger.info(f"Stage {screener.stage} screener {screener.hotkey} claimed agent {claimed_agent['agent_name']} ({claimed_agent['version_id']})") + logger.info(f"Stage {screener.stage} screener {screener.hotkey} claimed agent {awaiting_agents[0]['agent_name']} ({awaiting_agents[0]['version_id']})") agent = MinerAgent( - version_id=claimed_agent["version_id"], - miner_hotkey=claimed_agent["miner_hotkey"], - agent_name=claimed_agent["agent_name"], - version_num=claimed_agent["version_num"], - created_at=claimed_agent["created_at"], + version_id=awaiting_agents[0]["version_id"], + miner_hotkey=awaiting_agents[0]["miner_hotkey"], + agent_name=awaiting_agents[0]["agent_name"], + version_num=awaiting_agents[0]["version_num"], + created_at=awaiting_agents[0]["created_at"], status=target_screening_status, # Already set to correct status in query ) diff --git a/api/src/models/screener.py b/api/src/models/screener.py index 6de54e60..100bad92 100644 --- a/api/src/models/screener.py +++ b/api/src/models/screener.py @@ -318,6 +318,7 @@ async def get_first_available() -> Optional['Screener']: async def get_first_available_and_reserve(stage: int) -> Optional['Screener']: """Atomically find and reserve first available screener for specific stage - MUST be called within Evaluation lock""" from api.src.socket.websocket_manager import WebSocketManager + from api.src.backend.db_manager import get_db_connection ws_manager = WebSocketManager.get_instance() for client in ws_manager.clients.values(): @@ -325,6 +326,19 @@ async def get_first_available_and_reserve(stage: int) -> Optional['Screener']: client.status == "available" and client.is_available() and client.stage == stage): + + # Double-check against database to ensure screener is truly available + async with get_db_connection() as db_conn: + has_running_evaluation = await db_conn.fetchval( + """ + SELECT EXISTS(SELECT 1 FROM evaluations + WHERE validator_hotkey = $1 AND status = 'running') + """, client.hotkey + ) + + if has_running_evaluation: + logger.warning(f"Screener {client.hotkey} appears available in memory but has running evaluation in database - skipping") + continue # Immediately reserve to prevent race conditions client.status = "reserving"