diff --git a/api/src/backend/queries/evaluations.py b/api/src/backend/queries/evaluations.py index 0d740df8..061fdcd0 100644 --- a/api/src/backend/queries/evaluations.py +++ b/api/src/backend/queries/evaluations.py @@ -439,14 +439,4 @@ async def get_miner_hotkey_from_version_id(conn: asyncpg.Connection, version_id: SELECT miner_hotkey FROM miner_agents WHERE version_id = $1 - """, version_id) - -@db_operation -async def is_screener_running_evaluation(conn: asyncpg.Connection, validator_hotkey: str) -> bool: - """Check if a screener is running an evaluation""" - return await conn.fetchval( - """ - SELECT EXISTS(SELECT 1 FROM evaluations WHERE validator_hotkey = $1 AND status = 'running') - """, - validator_hotkey - ) \ No newline at end of file + """, version_id) \ No newline at end of file diff --git a/api/src/backend/queries/evaluations.pyi b/api/src/backend/queries/evaluations.pyi index 97d2075d..30d42c99 100644 --- a/api/src/backend/queries/evaluations.pyi +++ b/api/src/backend/queries/evaluations.pyi @@ -11,5 +11,4 @@ async def get_running_evaluation_by_miner_hotkey(miner_hotkey: str) -> Optional[ async def does_validator_have_running_evaluation(validator_hotkey: str) -> bool: ... async def get_queue_info(validator_hotkey: str, length: int = 10) -> List[Evaluation]: ... async def get_agent_name_from_version_id(version_id: str) -> Optional[str]: ... -async def get_miner_hotkey_from_version_id(version_id: str) -> Optional[str]: ... -async def is_screener_running_evaluation(validator_hotkey: str) -> bool: ... \ No newline at end of file +async def get_miner_hotkey_from_version_id(version_id: str) -> Optional[str]: ... \ No newline at end of file diff --git a/api/src/endpoints/upload.py b/api/src/endpoints/upload.py index 417f8ddb..24a7fb03 100644 --- a/api/src/endpoints/upload.py +++ b/api/src/endpoints/upload.py @@ -112,16 +112,6 @@ async def post_agent( detail="No stage 1 screeners available for agent evaluation. Please try again later." ) - # Check is there is an evaluation in the database with the screener's hotkey that has running status - # This is to prevent the case where a screener.is_available() returns true but the screener is actually running an evaluation - # from api.src.backend.queries.evaluations import is_screener_running_evaluation - # is_screener_running_evaluation = await is_screener_running_evaluation(screener.hotkey) - # if is_screener_running_evaluation: - # logger.error(f"No available stage 1 screener for agent upload from miner {miner_hotkey} - screener {screener.hotkey} said it was available but there is an evaluation in the database with the screener's hotkey that has running status") - # raise HTTPException( - # status_code=409, - # detail="No stage 1 screeners available for agent evaluation. Please try again later." - # ) async with get_transaction() as conn: can_upload = await Evaluation.check_miner_has_no_running_evaluations(conn, miner_hotkey) @@ -159,6 +149,7 @@ async def post_agent( logger.warning(f"Failed to assign agent {agent.version_id} to screener") else: logger.warning(f"Failed to assign agent {agent.version_id} to screener - screener is not running") + # Screener state is now committed, lock can be released diff --git a/api/src/models/evaluation.py b/api/src/models/evaluation.py index f8e39753..6514f103 100644 --- a/api/src/models/evaluation.py +++ b/api/src/models/evaluation.py @@ -106,16 +106,32 @@ async def start(self, conn: asyncpg.Connection) -> List[EvaluationRun]: async def finish(self, conn: asyncpg.Connection): """Finish evaluation, but retry if >=50% of inferences failed and any run errored""" - # DEBUG: Check if this evaluation already has a terminated_reason set + # Check if evaluation is already completed - prevent duplicate finish calls + current_status = await conn.fetchval( + "SELECT status FROM evaluations WHERE evaluation_id = $1", + self.evaluation_id + ) + + if current_status == 'completed': + logger.warning( + f"finish() called on already completed evaluation {self.evaluation_id} " + f"(version: {self.version_id}, validator: {self.validator_hotkey}). " + f"Skipping to prevent agent status overwrites." + ) + return None + + # Use the evaluation lock to prevent race conditions with disconnection handling + # async with Evaluation.get_lock(): + # DEBUG: Check if this evaluation already has a terminated_reason set current_terminated_reason = await conn.fetchval( "SELECT terminated_reason FROM evaluations WHERE evaluation_id = $1", self.evaluation_id ) if current_terminated_reason: - current_status = await conn.fetchval( - "SELECT status FROM evaluations WHERE evaluation_id = $1", - self.evaluation_id - ) + # current_status = await conn.fetchval( + # "SELECT status FROM evaluations WHERE evaluation_id = $1", + # self.evaluation_id + # ) # Print very noticeable debug information print("=" * 80) @@ -484,19 +500,25 @@ async def create_screening_and_send(conn: asyncpg.Connection, agent: 'MinerAgent """Create screening evaluation""" from api.src.socket.websocket_manager import WebSocketManager - # Safety check: Ensure screener doesn't already have a running evaluation - existing_evaluation = await conn.fetchrow( - """ - SELECT evaluation_id, status FROM evaluations - WHERE validator_hotkey = $1 AND status = 'running' - LIMIT 1 - """, - screener.hotkey - ) + # # Additional safety check: Ensure this agent doesn't already have a running screening at the same stage (lowk useless) + # screener_stage = screener.stage + # agent_running_screening = await conn.fetchval( + # """ + # SELECT COUNT(*) FROM evaluations e + # JOIN miner_agents ma ON e.version_id = ma.version_id + # WHERE ma.version_id = $1 + # AND ( + # (e.validator_hotkey LIKE 'screener-1-%' OR e.validator_hotkey LIKE 'i-0%') + # OR e.validator_hotkey LIKE 'screener-2-%' + # ) + # AND e.status = 'running' + # """, + # agent.version_id + # ) - if existing_evaluation: - logger.error(f"CRITICAL: Screener {screener.hotkey} already has running evaluation {existing_evaluation['evaluation_id']} - refusing to create duplicate screening") - return "", False + # if agent_running_screening > 0: + # logger.error(f"CRITICAL: Agent {agent.version_id} already has running screening - refusing to create duplicate screening") + # return "", False ws = WebSocketManager.get_instance() @@ -575,7 +597,7 @@ async def screen_next_awaiting_agent(screener: "Screener"): # Log the agents for debugging awaiting_agents = await conn.fetch( """ - SELECT version_id, miner_hotkey, agent_name, created_at FROM miner_agents + SELECT version_id, miner_hotkey, agent_name, created_at, version_num, created_at FROM miner_agents WHERE status = $1 AND miner_hotkey NOT IN (SELECT miner_hotkey from banned_hotkeys) ORDER BY created_at ASC @@ -585,45 +607,20 @@ async def screen_next_awaiting_agent(screener: "Screener"): for agent in awaiting_agents[:3]: # Log first 3 logger.info(f"Awaiting stage {screener.stage} agent: {agent['agent_name']} ({agent['version_id']}) from {agent['miner_hotkey']}") - # Atomically claim the next awaiting agent for this stage using CTE with FOR UPDATE SKIP LOCKED - logger.debug(f"Stage {screener.stage} screener {screener.hotkey} attempting to claim agent with status '{target_status}'") - try: - claimed_agent = await conn.fetchrow( - """ - WITH next_agent AS ( - SELECT version_id FROM miner_agents - WHERE status = $1 - AND miner_hotkey NOT IN (SELECT miner_hotkey from banned_hotkeys) - ORDER BY created_at ASC - FOR UPDATE SKIP LOCKED - LIMIT 1 - ) - UPDATE miner_agents - SET status = $2 - FROM next_agent - WHERE miner_agents.version_id = next_agent.version_id - RETURNING miner_agents.version_id, miner_hotkey, agent_name, version_num, created_at - """, - target_status, - target_screening_status - ) - except Exception as e: - logger.warning(f"Database error while claiming agent for screener {screener.hotkey}: {e}") - claimed_agent = None - if not claimed_agent: + else: screener.set_available() # Ensure available state is set logger.info(f"No stage {screener.stage} agents claimed by screener {screener.hotkey} despite {awaiting_count} awaiting") return - logger.info(f"Stage {screener.stage} screener {screener.hotkey} claimed agent {claimed_agent['agent_name']} ({claimed_agent['version_id']})") + logger.info(f"Stage {screener.stage} screener {screener.hotkey} claimed agent {awaiting_agents[0]['agent_name']} ({awaiting_agents[0]['version_id']})") agent = MinerAgent( - version_id=claimed_agent["version_id"], - miner_hotkey=claimed_agent["miner_hotkey"], - agent_name=claimed_agent["agent_name"], - version_num=claimed_agent["version_num"], - created_at=claimed_agent["created_at"], + version_id=awaiting_agents[0]["version_id"], + miner_hotkey=awaiting_agents[0]["miner_hotkey"], + agent_name=awaiting_agents[0]["agent_name"], + version_num=awaiting_agents[0]["version_num"], + created_at=awaiting_agents[0]["created_at"], status=target_screening_status, # Already set to correct status in query ) @@ -769,6 +766,7 @@ async def handle_validator_disconnection(validator_hotkey: str): @staticmethod async def handle_screener_disconnection(screener_hotkey: str): """Atomically handle screener disconnection: error active evaluations and reset agents""" + # async with Evaluation.get_lock(): async with get_transaction() as conn: # Get active screening evaluations for all screener types active_screenings = await conn.fetch( diff --git a/api/src/models/screener.py b/api/src/models/screener.py index 6de54e60..48a92bca 100644 --- a/api/src/models/screener.py +++ b/api/src/models/screener.py @@ -231,9 +231,10 @@ async def disconnect(self): """Handle screener disconnection""" from api.src.models.evaluation import Evaluation # Explicitly reset status on disconnect to ensure clean state - self.set_available() - logger.info(f"Screener {self.hotkey} disconnected, status reset to: {self.status}") - await Evaluation.handle_screener_disconnection(self.hotkey) + async with Evaluation.get_lock(): + self.set_available() + logger.info(f"Screener {self.hotkey} disconnected, status reset to: {self.status}") + await Evaluation.handle_screener_disconnection(self.hotkey) async def finish_screening(self, evaluation_id: str, errored: bool = False, reason: Optional[str] = None): """Finish screening evaluation""" @@ -241,47 +242,46 @@ async def finish_screening(self, evaluation_id: str, errored: bool = False, reas logger.info(f"Screener {self.hotkey}: Finishing screening {evaluation_id}, entered finish_screening") - try: - evaluation = await Evaluation.get_by_id(evaluation_id) - if not evaluation or not evaluation.is_screening or evaluation.validator_hotkey != self.hotkey: - logger.warning(f"Screener {self.hotkey}: Invalid finish_screening call for evaluation {evaluation_id}") - return - - async with get_transaction() as conn: - agent_status = await conn.fetchval("SELECT status FROM miner_agents WHERE version_id = $1", evaluation.version_id) - expected_status = getattr(AgentStatus, f"screening_{self.stage}") - if AgentStatus.from_string(agent_status) != expected_status: - logger.warning(f"Stage {self.stage} screener {self.hotkey}: Evaluation {evaluation_id}: Agent {evaluation.version_id} not in screening_{self.stage} status during finish (current: {agent_status})") - # Clearly a bug here, its somehow set to failed_screening_1 when we hit this if statement - # It should be screening_1, no idea whats setting it to failed_screening_1 - # return + async with Evaluation.get_lock(): # SINGLE LOCK FOR ENTIRE FUNCTION + try: + evaluation = await Evaluation.get_by_id(evaluation_id) + if not evaluation or not evaluation.is_screening or evaluation.validator_hotkey != self.hotkey: + logger.warning(f"Screener {self.hotkey}: Invalid finish_screening call for evaluation {evaluation_id}") + return - if errored: - logger.info(f"Screener {self.hotkey}: Finishing screening {evaluation_id}: Errored with reason: {reason}") - await evaluation.error(conn, reason) - logger.info(f"Screener {self.hotkey}: Finishing screening {evaluation_id}: Errored with reason: {reason}: done") - notification_targets = None - else: - notification_targets = await evaluation.finish(conn) + async with get_transaction() as conn: + agent_status = await conn.fetchval("SELECT status FROM miner_agents WHERE version_id = $1", evaluation.version_id) + expected_status = getattr(AgentStatus, f"screening_{self.stage}") + if AgentStatus.from_string(agent_status) != expected_status: + logger.warning(f"Stage {self.stage} screener {self.hotkey}: Evaluation {evaluation_id}: Agent {evaluation.version_id} not in screening_{self.stage} status during finish (current: {agent_status})") + # Clearly a bug here, its somehow set to failed_screening_1 when we hit this if statement + # It should be screening_1, no idea whats setting it to failed_screening_1 + # return + + if errored: + logger.info(f"Screener {self.hotkey}: Finishing screening {evaluation_id}: Errored with reason: {reason}") + await evaluation.error(conn, reason) + logger.info(f"Screener {self.hotkey}: Finishing screening {evaluation_id}: Errored with reason: {reason}: done") + notification_targets = None + else: + notification_targets = await evaluation.finish(conn) - from api.src.socket.websocket_manager import WebSocketManager - ws_manager = WebSocketManager.get_instance() - await ws_manager.send_to_all_non_validators("evaluation-finished", {"evaluation_id": evaluation_id}) + from api.src.socket.websocket_manager import WebSocketManager + ws_manager = WebSocketManager.get_instance() + await ws_manager.send_to_all_non_validators("evaluation-finished", {"evaluation_id": evaluation_id}) - self.set_available() - - logger.info(f"Screener {self.hotkey}: Successfully finished evaluation {evaluation_id}, errored={errored}") - - # Handle notifications AFTER transaction commits - if notification_targets: - # Notify stage 2 screener when stage 1 completes - if notification_targets.get("stage2_screener"): - async with Evaluation.get_lock(): - await Evaluation.screen_next_awaiting_agent(notification_targets["stage2_screener"]) + self.set_available() + + logger.info(f"Screener {self.hotkey}: Successfully finished evaluation {evaluation_id}, errored={errored}") - # Notify validators with proper lock protection - for validator in notification_targets.get("validators", []): - async with Evaluation.get_lock(): + # Handle notifications AFTER transaction commits + if notification_targets: + # Notify stage 2 screener when stage 1 completes + if notification_targets.get("stage2_screener"): + await Evaluation.screen_next_awaiting_agent(notification_targets["stage2_screener"]) + + # Notify validators + for validator in notification_targets.get("validators", []): if validator.is_available(): success = await validator.start_evaluation_and_send(evaluation_id) if success: @@ -290,16 +290,18 @@ async def finish_screening(self, evaluation_id: str, errored: bool = False, reas logger.warning(f"Failed to assign evaluation {evaluation_id} to validator {validator.hotkey}") else: logger.info(f"Validator {validator.hotkey} not available for evaluation {evaluation_id}") - - logger.info(f"Screener {self.hotkey}: Finishing screening {evaluation_id}: Got to end of try block") - finally: - logger.info(f"Screener {self.hotkey}: Finishing screening {evaluation_id}, in finally block") - # Single atomic reset and reassignment - async with Evaluation.get_lock(): + self.set_available() logger.info(f"Screener {self.hotkey}: Reset to available and looking for next agent") await Evaluation.screen_next_awaiting_agent(self) - logger.info(f"Screener {self.hotkey}: Finishing screening {evaluation_id}, exiting finally block") + logger.info(f"Screener {self.hotkey}: Finishing screening {evaluation_id}, completed successfully") + + except Exception as e: + logger.error(f"Screener {self.hotkey}: Error in finish_screening: {e}") + # Ensure screener is reset even on error + self.set_available() + await Evaluation.screen_next_awaiting_agent(self) + logger.info(f"Screener {self.hotkey}: Reset to available after error in finish_screening") @staticmethod async def get_first_available() -> Optional['Screener']: @@ -318,6 +320,8 @@ async def get_first_available() -> Optional['Screener']: async def get_first_available_and_reserve(stage: int) -> Optional['Screener']: """Atomically find and reserve first available screener for specific stage - MUST be called within Evaluation lock""" from api.src.socket.websocket_manager import WebSocketManager + from api.src.backend.db_manager import get_db_connection + ws_manager = WebSocketManager.get_instance() for client in ws_manager.clients.values(): @@ -326,6 +330,20 @@ async def get_first_available_and_reserve(stage: int) -> Optional['Screener']: client.is_available() and client.stage == stage): + # Double-check against database to ensure screener is truly available + async with get_db_connection() as db_conn: + has_running_evaluation = await db_conn.fetchval( + """ + SELECT EXISTS(SELECT 1 FROM evaluations + WHERE validator_hotkey = $1 AND status = 'running') + """, + client.hotkey + ) + + if has_running_evaluation: + logger.warning(f"Screener {client.hotkey} appears available in memory but has running evaluation in database - skipping") + continue + # Immediately reserve to prevent race conditions client.status = "reserving" logger.info(f"Reserved stage {stage} screener {client.hotkey} for work assignment") diff --git a/api/src/models/validator.py b/api/src/models/validator.py index ed1be1df..b5da2755 100644 --- a/api/src/models/validator.py +++ b/api/src/models/validator.py @@ -149,7 +149,8 @@ async def connect(self): async def disconnect(self): """Handle validator disconnection""" from api.src.models.evaluation import Evaluation - await Evaluation.handle_validator_disconnection(self.hotkey) + async with Evaluation.get_lock(): + await Evaluation.handle_validator_disconnection(self.hotkey) async def get_next_evaluation(self) -> Optional[str]: """Get next evaluation ID for this validator""" @@ -167,44 +168,42 @@ async def get_next_evaluation(self) -> Optional[str]: async def finish_evaluation(self, evaluation_id: str, errored: bool = False, reason: Optional[str] = None): """Finish evaluation and automatically look for next work""" from api.src.models.evaluation import Evaluation - - try: - evaluation = await Evaluation.get_by_id(evaluation_id) - if not evaluation or evaluation.validator_hotkey != self.hotkey: - logger.warning(f"Validator {self.hotkey}: Invalid finish_evaluation call for evaluation {evaluation_id}") - return - - async with get_transaction() as conn: - agent_status = await conn.fetchval("SELECT status FROM miner_agents WHERE version_id = $1", evaluation.version_id) - if AgentStatus.from_string(agent_status) != AgentStatus.evaluating: - logger.warning(f"Validator {self.hotkey}: Agent {evaluation.version_id} not in evaluating status during finish") + async with Evaluation.get_lock(): + try: + evaluation = await Evaluation.get_by_id(evaluation_id) + if not evaluation or evaluation.validator_hotkey != self.hotkey: + logger.warning(f"Validator {self.hotkey}: Invalid finish_evaluation call for evaluation {evaluation_id}") return - if errored: - await evaluation.error(conn, reason) - notification_targets = None - else: - notification_targets = await evaluation.finish(conn) - - from api.src.socket.websocket_manager import WebSocketManager - ws_manager = WebSocketManager.get_instance() - await ws_manager.send_to_all_non_validators("evaluation-finished", {"evaluation_id": evaluation_id}) - - logger.info(f"Validator {self.hotkey}: Successfully finished evaluation {evaluation_id}, errored={errored}") - - # Handle notifications AFTER transaction commits - if notification_targets: - # Note: Validators typically don't trigger stage transitions, but handle any notifications - for validator in notification_targets.get("validators", []): - async with Evaluation.get_lock(): - if validator.is_available(): - success = await validator.start_evaluation_and_send(evaluation_id) - if success: - logger.info(f"Successfully assigned evaluation {evaluation_id} to validator {validator.hotkey}") - - finally: - # Single atomic reset and reassignment - async with Evaluation.get_lock(): + async with get_transaction() as conn: + agent_status = await conn.fetchval("SELECT status FROM miner_agents WHERE version_id = $1", evaluation.version_id) + if AgentStatus.from_string(agent_status) != AgentStatus.evaluating: + logger.warning(f"Validator {self.hotkey}: Agent {evaluation.version_id} not in evaluating status during finish") + return + + if errored: + await evaluation.error(conn, reason) + notification_targets = None + else: + notification_targets = await evaluation.finish(conn) + + from api.src.socket.websocket_manager import WebSocketManager + ws_manager = WebSocketManager.get_instance() + await ws_manager.send_to_all_non_validators("evaluation-finished", {"evaluation_id": evaluation_id}) + + logger.info(f"Validator {self.hotkey}: Successfully finished evaluation {evaluation_id}, errored={errored}") + + # Handle notifications AFTER transaction commits + if notification_targets: + # Note: Validators typically don't trigger stage transitions, but handle any notifications + for validator in notification_targets.get("validators", []): + async with Evaluation.get_lock(): + if validator.is_available(): + success = await validator.start_evaluation_and_send(evaluation_id) + if success: + logger.info(f"Successfully assigned evaluation {evaluation_id} to validator {validator.hotkey}") + + finally: self.set_available() logger.info(f"Validator {self.hotkey}: Reset to available and looking for next evaluation") await self._check_and_start_next_evaluation() diff --git a/api/src/socket/handlers/handle_heartbeat.py b/api/src/socket/handlers/handle_heartbeat.py index 1468dc77..e7f7738b 100644 --- a/api/src/socket/handlers/handle_heartbeat.py +++ b/api/src/socket/handlers/handle_heartbeat.py @@ -30,7 +30,7 @@ async def handle_heartbeat( client.status_mismatch_count += 1 logger.warning(f"Client {client.hotkey} status mismatch #{client.status_mismatch_count}: Client says {alleged_status}, but Platform says {client.status}") - if client.status_mismatch_count >= 25: + if client.status_mismatch_count >= 100: logger.error(f"Client {client.hotkey} has {client.status_mismatch_count} consecutive status mismatches. Forcing reconnection.") await websocket.send_json({"event": "error", "error": f"Too many status mismatches ({client.status_mismatch_count}). Reconnecting..."}) await websocket.close()