diff --git a/api/src/backend/queries/evaluations.py b/api/src/backend/queries/evaluations.py index 061fdcd0..0d740df8 100644 --- a/api/src/backend/queries/evaluations.py +++ b/api/src/backend/queries/evaluations.py @@ -439,4 +439,14 @@ async def get_miner_hotkey_from_version_id(conn: asyncpg.Connection, version_id: SELECT miner_hotkey FROM miner_agents WHERE version_id = $1 - """, version_id) \ No newline at end of file + """, version_id) + +@db_operation +async def is_screener_running_evaluation(conn: asyncpg.Connection, validator_hotkey: str) -> bool: + """Check if a screener is running an evaluation""" + return await conn.fetchval( + """ + SELECT EXISTS(SELECT 1 FROM evaluations WHERE validator_hotkey = $1 AND status = 'running') + """, + validator_hotkey + ) \ No newline at end of file diff --git a/api/src/backend/queries/evaluations.pyi b/api/src/backend/queries/evaluations.pyi index 30d42c99..97d2075d 100644 --- a/api/src/backend/queries/evaluations.pyi +++ b/api/src/backend/queries/evaluations.pyi @@ -11,4 +11,5 @@ async def get_running_evaluation_by_miner_hotkey(miner_hotkey: str) -> Optional[ async def does_validator_have_running_evaluation(validator_hotkey: str) -> bool: ... async def get_queue_info(validator_hotkey: str, length: int = 10) -> List[Evaluation]: ... async def get_agent_name_from_version_id(version_id: str) -> Optional[str]: ... -async def get_miner_hotkey_from_version_id(version_id: str) -> Optional[str]: ... \ No newline at end of file +async def get_miner_hotkey_from_version_id(version_id: str) -> Optional[str]: ... +async def is_screener_running_evaluation(validator_hotkey: str) -> bool: ... \ No newline at end of file diff --git a/api/src/endpoints/upload.py b/api/src/endpoints/upload.py index 24a7fb03..417f8ddb 100644 --- a/api/src/endpoints/upload.py +++ b/api/src/endpoints/upload.py @@ -112,6 +112,16 @@ async def post_agent( detail="No stage 1 screeners available for agent evaluation. Please try again later." ) + # Check is there is an evaluation in the database with the screener's hotkey that has running status + # This is to prevent the case where a screener.is_available() returns true but the screener is actually running an evaluation + # from api.src.backend.queries.evaluations import is_screener_running_evaluation + # is_screener_running_evaluation = await is_screener_running_evaluation(screener.hotkey) + # if is_screener_running_evaluation: + # logger.error(f"No available stage 1 screener for agent upload from miner {miner_hotkey} - screener {screener.hotkey} said it was available but there is an evaluation in the database with the screener's hotkey that has running status") + # raise HTTPException( + # status_code=409, + # detail="No stage 1 screeners available for agent evaluation. Please try again later." + # ) async with get_transaction() as conn: can_upload = await Evaluation.check_miner_has_no_running_evaluations(conn, miner_hotkey) @@ -149,7 +159,6 @@ async def post_agent( logger.warning(f"Failed to assign agent {agent.version_id} to screener") else: logger.warning(f"Failed to assign agent {agent.version_id} to screener - screener is not running") - # Screener state is now committed, lock can be released diff --git a/api/src/models/evaluation.py b/api/src/models/evaluation.py index 6514f103..f8e39753 100644 --- a/api/src/models/evaluation.py +++ b/api/src/models/evaluation.py @@ -106,32 +106,16 @@ async def start(self, conn: asyncpg.Connection) -> List[EvaluationRun]: async def finish(self, conn: asyncpg.Connection): """Finish evaluation, but retry if >=50% of inferences failed and any run errored""" - # Check if evaluation is already completed - prevent duplicate finish calls - current_status = await conn.fetchval( - "SELECT status FROM evaluations WHERE evaluation_id = $1", - self.evaluation_id - ) - - if current_status == 'completed': - logger.warning( - f"finish() called on already completed evaluation {self.evaluation_id} " - f"(version: {self.version_id}, validator: {self.validator_hotkey}). " - f"Skipping to prevent agent status overwrites." - ) - return None - - # Use the evaluation lock to prevent race conditions with disconnection handling - # async with Evaluation.get_lock(): - # DEBUG: Check if this evaluation already has a terminated_reason set + # DEBUG: Check if this evaluation already has a terminated_reason set current_terminated_reason = await conn.fetchval( "SELECT terminated_reason FROM evaluations WHERE evaluation_id = $1", self.evaluation_id ) if current_terminated_reason: - # current_status = await conn.fetchval( - # "SELECT status FROM evaluations WHERE evaluation_id = $1", - # self.evaluation_id - # ) + current_status = await conn.fetchval( + "SELECT status FROM evaluations WHERE evaluation_id = $1", + self.evaluation_id + ) # Print very noticeable debug information print("=" * 80) @@ -500,25 +484,19 @@ async def create_screening_and_send(conn: asyncpg.Connection, agent: 'MinerAgent """Create screening evaluation""" from api.src.socket.websocket_manager import WebSocketManager - # # Additional safety check: Ensure this agent doesn't already have a running screening at the same stage (lowk useless) - # screener_stage = screener.stage - # agent_running_screening = await conn.fetchval( - # """ - # SELECT COUNT(*) FROM evaluations e - # JOIN miner_agents ma ON e.version_id = ma.version_id - # WHERE ma.version_id = $1 - # AND ( - # (e.validator_hotkey LIKE 'screener-1-%' OR e.validator_hotkey LIKE 'i-0%') - # OR e.validator_hotkey LIKE 'screener-2-%' - # ) - # AND e.status = 'running' - # """, - # agent.version_id - # ) + # Safety check: Ensure screener doesn't already have a running evaluation + existing_evaluation = await conn.fetchrow( + """ + SELECT evaluation_id, status FROM evaluations + WHERE validator_hotkey = $1 AND status = 'running' + LIMIT 1 + """, + screener.hotkey + ) - # if agent_running_screening > 0: - # logger.error(f"CRITICAL: Agent {agent.version_id} already has running screening - refusing to create duplicate screening") - # return "", False + if existing_evaluation: + logger.error(f"CRITICAL: Screener {screener.hotkey} already has running evaluation {existing_evaluation['evaluation_id']} - refusing to create duplicate screening") + return "", False ws = WebSocketManager.get_instance() @@ -597,7 +575,7 @@ async def screen_next_awaiting_agent(screener: "Screener"): # Log the agents for debugging awaiting_agents = await conn.fetch( """ - SELECT version_id, miner_hotkey, agent_name, created_at, version_num, created_at FROM miner_agents + SELECT version_id, miner_hotkey, agent_name, created_at FROM miner_agents WHERE status = $1 AND miner_hotkey NOT IN (SELECT miner_hotkey from banned_hotkeys) ORDER BY created_at ASC @@ -607,20 +585,45 @@ async def screen_next_awaiting_agent(screener: "Screener"): for agent in awaiting_agents[:3]: # Log first 3 logger.info(f"Awaiting stage {screener.stage} agent: {agent['agent_name']} ({agent['version_id']}) from {agent['miner_hotkey']}") + # Atomically claim the next awaiting agent for this stage using CTE with FOR UPDATE SKIP LOCKED + logger.debug(f"Stage {screener.stage} screener {screener.hotkey} attempting to claim agent with status '{target_status}'") + try: + claimed_agent = await conn.fetchrow( + """ + WITH next_agent AS ( + SELECT version_id FROM miner_agents + WHERE status = $1 + AND miner_hotkey NOT IN (SELECT miner_hotkey from banned_hotkeys) + ORDER BY created_at ASC + FOR UPDATE SKIP LOCKED + LIMIT 1 + ) + UPDATE miner_agents + SET status = $2 + FROM next_agent + WHERE miner_agents.version_id = next_agent.version_id + RETURNING miner_agents.version_id, miner_hotkey, agent_name, version_num, created_at + """, + target_status, + target_screening_status + ) + except Exception as e: + logger.warning(f"Database error while claiming agent for screener {screener.hotkey}: {e}") + claimed_agent = None - else: + if not claimed_agent: screener.set_available() # Ensure available state is set logger.info(f"No stage {screener.stage} agents claimed by screener {screener.hotkey} despite {awaiting_count} awaiting") return - logger.info(f"Stage {screener.stage} screener {screener.hotkey} claimed agent {awaiting_agents[0]['agent_name']} ({awaiting_agents[0]['version_id']})") + logger.info(f"Stage {screener.stage} screener {screener.hotkey} claimed agent {claimed_agent['agent_name']} ({claimed_agent['version_id']})") agent = MinerAgent( - version_id=awaiting_agents[0]["version_id"], - miner_hotkey=awaiting_agents[0]["miner_hotkey"], - agent_name=awaiting_agents[0]["agent_name"], - version_num=awaiting_agents[0]["version_num"], - created_at=awaiting_agents[0]["created_at"], + version_id=claimed_agent["version_id"], + miner_hotkey=claimed_agent["miner_hotkey"], + agent_name=claimed_agent["agent_name"], + version_num=claimed_agent["version_num"], + created_at=claimed_agent["created_at"], status=target_screening_status, # Already set to correct status in query ) @@ -766,7 +769,6 @@ async def handle_validator_disconnection(validator_hotkey: str): @staticmethod async def handle_screener_disconnection(screener_hotkey: str): """Atomically handle screener disconnection: error active evaluations and reset agents""" - # async with Evaluation.get_lock(): async with get_transaction() as conn: # Get active screening evaluations for all screener types active_screenings = await conn.fetch( diff --git a/api/src/models/screener.py b/api/src/models/screener.py index 48a92bca..6de54e60 100644 --- a/api/src/models/screener.py +++ b/api/src/models/screener.py @@ -231,10 +231,9 @@ async def disconnect(self): """Handle screener disconnection""" from api.src.models.evaluation import Evaluation # Explicitly reset status on disconnect to ensure clean state - async with Evaluation.get_lock(): - self.set_available() - logger.info(f"Screener {self.hotkey} disconnected, status reset to: {self.status}") - await Evaluation.handle_screener_disconnection(self.hotkey) + self.set_available() + logger.info(f"Screener {self.hotkey} disconnected, status reset to: {self.status}") + await Evaluation.handle_screener_disconnection(self.hotkey) async def finish_screening(self, evaluation_id: str, errored: bool = False, reason: Optional[str] = None): """Finish screening evaluation""" @@ -242,46 +241,47 @@ async def finish_screening(self, evaluation_id: str, errored: bool = False, reas logger.info(f"Screener {self.hotkey}: Finishing screening {evaluation_id}, entered finish_screening") - async with Evaluation.get_lock(): # SINGLE LOCK FOR ENTIRE FUNCTION - try: - evaluation = await Evaluation.get_by_id(evaluation_id) - if not evaluation or not evaluation.is_screening or evaluation.validator_hotkey != self.hotkey: - logger.warning(f"Screener {self.hotkey}: Invalid finish_screening call for evaluation {evaluation_id}") - return + try: + evaluation = await Evaluation.get_by_id(evaluation_id) + if not evaluation or not evaluation.is_screening or evaluation.validator_hotkey != self.hotkey: + logger.warning(f"Screener {self.hotkey}: Invalid finish_screening call for evaluation {evaluation_id}") + return + + async with get_transaction() as conn: + agent_status = await conn.fetchval("SELECT status FROM miner_agents WHERE version_id = $1", evaluation.version_id) + expected_status = getattr(AgentStatus, f"screening_{self.stage}") + if AgentStatus.from_string(agent_status) != expected_status: + logger.warning(f"Stage {self.stage} screener {self.hotkey}: Evaluation {evaluation_id}: Agent {evaluation.version_id} not in screening_{self.stage} status during finish (current: {agent_status})") + # Clearly a bug here, its somehow set to failed_screening_1 when we hit this if statement + # It should be screening_1, no idea whats setting it to failed_screening_1 + # return - async with get_transaction() as conn: - agent_status = await conn.fetchval("SELECT status FROM miner_agents WHERE version_id = $1", evaluation.version_id) - expected_status = getattr(AgentStatus, f"screening_{self.stage}") - if AgentStatus.from_string(agent_status) != expected_status: - logger.warning(f"Stage {self.stage} screener {self.hotkey}: Evaluation {evaluation_id}: Agent {evaluation.version_id} not in screening_{self.stage} status during finish (current: {agent_status})") - # Clearly a bug here, its somehow set to failed_screening_1 when we hit this if statement - # It should be screening_1, no idea whats setting it to failed_screening_1 - # return - - if errored: - logger.info(f"Screener {self.hotkey}: Finishing screening {evaluation_id}: Errored with reason: {reason}") - await evaluation.error(conn, reason) - logger.info(f"Screener {self.hotkey}: Finishing screening {evaluation_id}: Errored with reason: {reason}: done") - notification_targets = None - else: - notification_targets = await evaluation.finish(conn) + if errored: + logger.info(f"Screener {self.hotkey}: Finishing screening {evaluation_id}: Errored with reason: {reason}") + await evaluation.error(conn, reason) + logger.info(f"Screener {self.hotkey}: Finishing screening {evaluation_id}: Errored with reason: {reason}: done") + notification_targets = None + else: + notification_targets = await evaluation.finish(conn) - from api.src.socket.websocket_manager import WebSocketManager - ws_manager = WebSocketManager.get_instance() - await ws_manager.send_to_all_non_validators("evaluation-finished", {"evaluation_id": evaluation_id}) + from api.src.socket.websocket_manager import WebSocketManager + ws_manager = WebSocketManager.get_instance() + await ws_manager.send_to_all_non_validators("evaluation-finished", {"evaluation_id": evaluation_id}) - self.set_available() - - logger.info(f"Screener {self.hotkey}: Successfully finished evaluation {evaluation_id}, errored={errored}") - - # Handle notifications AFTER transaction commits - if notification_targets: - # Notify stage 2 screener when stage 1 completes - if notification_targets.get("stage2_screener"): - await Evaluation.screen_next_awaiting_agent(notification_targets["stage2_screener"]) + self.set_available() - # Notify validators - for validator in notification_targets.get("validators", []): + logger.info(f"Screener {self.hotkey}: Successfully finished evaluation {evaluation_id}, errored={errored}") + + # Handle notifications AFTER transaction commits + if notification_targets: + # Notify stage 2 screener when stage 1 completes + if notification_targets.get("stage2_screener"): + async with Evaluation.get_lock(): + await Evaluation.screen_next_awaiting_agent(notification_targets["stage2_screener"]) + + # Notify validators with proper lock protection + for validator in notification_targets.get("validators", []): + async with Evaluation.get_lock(): if validator.is_available(): success = await validator.start_evaluation_and_send(evaluation_id) if success: @@ -290,18 +290,16 @@ async def finish_screening(self, evaluation_id: str, errored: bool = False, reas logger.warning(f"Failed to assign evaluation {evaluation_id} to validator {validator.hotkey}") else: logger.info(f"Validator {validator.hotkey} not available for evaluation {evaluation_id}") - + + logger.info(f"Screener {self.hotkey}: Finishing screening {evaluation_id}: Got to end of try block") + finally: + logger.info(f"Screener {self.hotkey}: Finishing screening {evaluation_id}, in finally block") + # Single atomic reset and reassignment + async with Evaluation.get_lock(): self.set_available() logger.info(f"Screener {self.hotkey}: Reset to available and looking for next agent") await Evaluation.screen_next_awaiting_agent(self) - logger.info(f"Screener {self.hotkey}: Finishing screening {evaluation_id}, completed successfully") - - except Exception as e: - logger.error(f"Screener {self.hotkey}: Error in finish_screening: {e}") - # Ensure screener is reset even on error - self.set_available() - await Evaluation.screen_next_awaiting_agent(self) - logger.info(f"Screener {self.hotkey}: Reset to available after error in finish_screening") + logger.info(f"Screener {self.hotkey}: Finishing screening {evaluation_id}, exiting finally block") @staticmethod async def get_first_available() -> Optional['Screener']: @@ -320,8 +318,6 @@ async def get_first_available() -> Optional['Screener']: async def get_first_available_and_reserve(stage: int) -> Optional['Screener']: """Atomically find and reserve first available screener for specific stage - MUST be called within Evaluation lock""" from api.src.socket.websocket_manager import WebSocketManager - from api.src.backend.db_manager import get_db_connection - ws_manager = WebSocketManager.get_instance() for client in ws_manager.clients.values(): @@ -330,20 +326,6 @@ async def get_first_available_and_reserve(stage: int) -> Optional['Screener']: client.is_available() and client.stage == stage): - # Double-check against database to ensure screener is truly available - async with get_db_connection() as db_conn: - has_running_evaluation = await db_conn.fetchval( - """ - SELECT EXISTS(SELECT 1 FROM evaluations - WHERE validator_hotkey = $1 AND status = 'running') - """, - client.hotkey - ) - - if has_running_evaluation: - logger.warning(f"Screener {client.hotkey} appears available in memory but has running evaluation in database - skipping") - continue - # Immediately reserve to prevent race conditions client.status = "reserving" logger.info(f"Reserved stage {stage} screener {client.hotkey} for work assignment") diff --git a/api/src/models/validator.py b/api/src/models/validator.py index b5da2755..ed1be1df 100644 --- a/api/src/models/validator.py +++ b/api/src/models/validator.py @@ -149,8 +149,7 @@ async def connect(self): async def disconnect(self): """Handle validator disconnection""" from api.src.models.evaluation import Evaluation - async with Evaluation.get_lock(): - await Evaluation.handle_validator_disconnection(self.hotkey) + await Evaluation.handle_validator_disconnection(self.hotkey) async def get_next_evaluation(self) -> Optional[str]: """Get next evaluation ID for this validator""" @@ -168,42 +167,44 @@ async def get_next_evaluation(self) -> Optional[str]: async def finish_evaluation(self, evaluation_id: str, errored: bool = False, reason: Optional[str] = None): """Finish evaluation and automatically look for next work""" from api.src.models.evaluation import Evaluation - async with Evaluation.get_lock(): - try: - evaluation = await Evaluation.get_by_id(evaluation_id) - if not evaluation or evaluation.validator_hotkey != self.hotkey: - logger.warning(f"Validator {self.hotkey}: Invalid finish_evaluation call for evaluation {evaluation_id}") + + try: + evaluation = await Evaluation.get_by_id(evaluation_id) + if not evaluation or evaluation.validator_hotkey != self.hotkey: + logger.warning(f"Validator {self.hotkey}: Invalid finish_evaluation call for evaluation {evaluation_id}") + return + + async with get_transaction() as conn: + agent_status = await conn.fetchval("SELECT status FROM miner_agents WHERE version_id = $1", evaluation.version_id) + if AgentStatus.from_string(agent_status) != AgentStatus.evaluating: + logger.warning(f"Validator {self.hotkey}: Agent {evaluation.version_id} not in evaluating status during finish") return - async with get_transaction() as conn: - agent_status = await conn.fetchval("SELECT status FROM miner_agents WHERE version_id = $1", evaluation.version_id) - if AgentStatus.from_string(agent_status) != AgentStatus.evaluating: - logger.warning(f"Validator {self.hotkey}: Agent {evaluation.version_id} not in evaluating status during finish") - return - - if errored: - await evaluation.error(conn, reason) - notification_targets = None - else: - notification_targets = await evaluation.finish(conn) - - from api.src.socket.websocket_manager import WebSocketManager - ws_manager = WebSocketManager.get_instance() - await ws_manager.send_to_all_non_validators("evaluation-finished", {"evaluation_id": evaluation_id}) - - logger.info(f"Validator {self.hotkey}: Successfully finished evaluation {evaluation_id}, errored={errored}") - - # Handle notifications AFTER transaction commits - if notification_targets: - # Note: Validators typically don't trigger stage transitions, but handle any notifications - for validator in notification_targets.get("validators", []): - async with Evaluation.get_lock(): - if validator.is_available(): - success = await validator.start_evaluation_and_send(evaluation_id) - if success: - logger.info(f"Successfully assigned evaluation {evaluation_id} to validator {validator.hotkey}") - - finally: + if errored: + await evaluation.error(conn, reason) + notification_targets = None + else: + notification_targets = await evaluation.finish(conn) + + from api.src.socket.websocket_manager import WebSocketManager + ws_manager = WebSocketManager.get_instance() + await ws_manager.send_to_all_non_validators("evaluation-finished", {"evaluation_id": evaluation_id}) + + logger.info(f"Validator {self.hotkey}: Successfully finished evaluation {evaluation_id}, errored={errored}") + + # Handle notifications AFTER transaction commits + if notification_targets: + # Note: Validators typically don't trigger stage transitions, but handle any notifications + for validator in notification_targets.get("validators", []): + async with Evaluation.get_lock(): + if validator.is_available(): + success = await validator.start_evaluation_and_send(evaluation_id) + if success: + logger.info(f"Successfully assigned evaluation {evaluation_id} to validator {validator.hotkey}") + + finally: + # Single atomic reset and reassignment + async with Evaluation.get_lock(): self.set_available() logger.info(f"Validator {self.hotkey}: Reset to available and looking for next evaluation") await self._check_and_start_next_evaluation() diff --git a/api/src/socket/handlers/handle_heartbeat.py b/api/src/socket/handlers/handle_heartbeat.py index e7f7738b..1468dc77 100644 --- a/api/src/socket/handlers/handle_heartbeat.py +++ b/api/src/socket/handlers/handle_heartbeat.py @@ -30,7 +30,7 @@ async def handle_heartbeat( client.status_mismatch_count += 1 logger.warning(f"Client {client.hotkey} status mismatch #{client.status_mismatch_count}: Client says {alleged_status}, but Platform says {client.status}") - if client.status_mismatch_count >= 100: + if client.status_mismatch_count >= 25: logger.error(f"Client {client.hotkey} has {client.status_mismatch_count} consecutive status mismatches. Forcing reconnection.") await websocket.send_json({"event": "error", "error": f"Too many status mismatches ({client.status_mismatch_count}). Reconnecting..."}) await websocket.close()