From 42439cef169bebd5c9761ac740afe1a720288c11 Mon Sep 17 00:00:00 2001 From: Faizan Naseer Date: Mon, 6 Oct 2025 21:36:38 -0400 Subject: [PATCH 1/3] added locks to fix race condition + upload fail safe for screening_1 bug --- api/src/endpoints/upload.py | 8 ++ api/src/models/evaluation.py | 245 ++++++++++++++++++----------------- api/src/models/screener.py | 45 +++---- 3 files changed, 155 insertions(+), 143 deletions(-) diff --git a/api/src/endpoints/upload.py b/api/src/endpoints/upload.py index fa6470a3..a3413c14 100644 --- a/api/src/endpoints/upload.py +++ b/api/src/endpoints/upload.py @@ -148,6 +148,14 @@ async def post_agent( logger.warning(f"Failed to assign agent {agent.version_id} to screener") else: logger.warning(f"Failed to assign agent {agent.version_id} to screener - screener is not running") + + # CRITICAL FIX: Reset agent status back to awaiting_screening_1 to prevent stuck agents + # The agent was created but never properly assigned to screener + await conn.execute( + "UPDATE miner_agents SET status = 'awaiting_screening_1' WHERE version_id = $1", + agent.version_id + ) + logger.warning(f"Reset agent {agent.version_id} status to awaiting_screening_1 due to failed screener assignment") # Screener state is now committed, lock can be released diff --git a/api/src/models/evaluation.py b/api/src/models/evaluation.py index f8e39753..185a658b 100644 --- a/api/src/models/evaluation.py +++ b/api/src/models/evaluation.py @@ -106,122 +106,124 @@ async def start(self, conn: asyncpg.Connection) -> List[EvaluationRun]: async def finish(self, conn: asyncpg.Connection): """Finish evaluation, but retry if >=50% of inferences failed and any run errored""" - # DEBUG: Check if this evaluation already has a terminated_reason set - current_terminated_reason = await conn.fetchval( - "SELECT terminated_reason FROM evaluations WHERE evaluation_id = $1", - self.evaluation_id - ) - if current_terminated_reason: - current_status = await conn.fetchval( - "SELECT status FROM evaluations WHERE evaluation_id = $1", + # Use the evaluation lock to prevent race conditions with disconnection handling + async with Evaluation.get_lock(): + # DEBUG: Check if this evaluation already has a terminated_reason set + current_terminated_reason = await conn.fetchval( + "SELECT terminated_reason FROM evaluations WHERE evaluation_id = $1", self.evaluation_id ) + if current_terminated_reason: + current_status = await conn.fetchval( + "SELECT status FROM evaluations WHERE evaluation_id = $1", + self.evaluation_id + ) + + # Print very noticeable debug information + print("=" * 80) + print("🚨 CRITICAL DEBUG: finish() called on evaluation with existing terminated_reason! 🚨") + print("=" * 80) + print(f"Evaluation ID: {self.evaluation_id}") + print(f"Version ID: {self.version_id}") + print(f"Validator Hotkey: {self.validator_hotkey}") + print(f"Current Status: {current_status}") + print(f"Existing terminated_reason: {current_terminated_reason}") + print(f"Is Screening: {self.is_screening}") + if self.is_screening: + print(f"Screener Stage: {self.screener_stage}") + print(f"Current Time: {datetime.now().isoformat()}") + print() + print("CALL STACK TRACE:") + print("-" * 40) + traceback.print_stack() + print("=" * 80) + + # Also log it for persistent record + logger.error( + f"CRITICAL: finish() called on evaluation {self.evaluation_id} " + f"(version_id: {self.version_id}, validator: {self.validator_hotkey}) " + f"that already has terminated_reason: '{current_terminated_reason}'. " + f"Current status: {current_status}. This will result in inconsistent state!" + ) - # Print very noticeable debug information - print("=" * 80) - print("🚨 CRITICAL DEBUG: finish() called on evaluation with existing terminated_reason! 🚨") - print("=" * 80) - print(f"Evaluation ID: {self.evaluation_id}") - print(f"Version ID: {self.version_id}") - print(f"Validator Hotkey: {self.validator_hotkey}") - print(f"Current Status: {current_status}") - print(f"Existing terminated_reason: {current_terminated_reason}") - print(f"Is Screening: {self.is_screening}") - if self.is_screening: - print(f"Screener Stage: {self.screener_stage}") - print(f"Current Time: {datetime.now().isoformat()}") - print() - print("CALL STACK TRACE:") - print("-" * 40) - traceback.print_stack() - print("=" * 80) + # Check if we should retry due to inference failures + successful, total, success_rate, any_run_errored = await self._check_inference_success_rate(conn) - # Also log it for persistent record - logger.error( - f"CRITICAL: finish() called on evaluation {self.evaluation_id} " - f"(version_id: {self.version_id}, validator: {self.validator_hotkey}) " - f"that already has terminated_reason: '{current_terminated_reason}'. " - f"Current status: {current_status}. This will result in inconsistent state!" - ) - - # Check if we should retry due to inference failures - successful, total, success_rate, any_run_errored = await self._check_inference_success_rate(conn) - - # If we have inferences and >=50% failed AND any run errored, retry instead of finishing - if total > 0 and success_rate < 0.5 and any_run_errored: - logger.info(f"Evaluation {self.evaluation_id} completed but {successful}/{total} successful inferences ({success_rate:.1%}) with run errors. Retrying...") - await self.reset_to_waiting(conn) - return - - await conn.execute("UPDATE evaluations SET status = 'completed', finished_at = NOW() WHERE evaluation_id = $1", self.evaluation_id) - self.status = EvaluationStatus.completed - self.score = await conn.fetchval("SELECT score FROM evaluations WHERE evaluation_id = $1", self.evaluation_id) + # If we have inferences and >=50% failed AND any run errored, retry instead of finishing + if total > 0 and success_rate < 0.5 and any_run_errored: + logger.info(f"Evaluation {self.evaluation_id} completed but {successful}/{total} successful inferences ({success_rate:.1%}) with run errors. Retrying...") + await self.reset_to_waiting(conn) + return + + await conn.execute("UPDATE evaluations SET status = 'completed', finished_at = NOW() WHERE evaluation_id = $1", self.evaluation_id) + self.status = EvaluationStatus.completed + self.score = await conn.fetchval("SELECT score FROM evaluations WHERE evaluation_id = $1", self.evaluation_id) - # Store validators to notify after agent status update - validators_to_notify = [] - stage2_screener_to_notify = None + # Store validators to notify after agent status update + validators_to_notify = [] + stage2_screener_to_notify = None - # If it's a screener, handle stage-specific logic - if self.is_screening: - stage = self.screener_stage - threshold = SCREENING_1_THRESHOLD if stage == 1 else SCREENING_2_THRESHOLD - if self.score < threshold: - logger.info(f"Stage {stage} screening failed for agent {self.version_id} with score {self.score} (threshold: {threshold})") - else: - logger.info(f"Stage {stage} screening passed for agent {self.version_id} with score {self.score} (threshold: {threshold})") - - if stage == 1: - # Stage 1 passed -> find ONE available stage 2 screener - from api.src.socket.websocket_manager import WebSocketManager - ws_manager = WebSocketManager.get_instance() - for client in ws_manager.clients.values(): - if client.get_type() != "screener": - continue - screener: Screener = client - - if screener.stage == 2 and screener.is_available(): - stage2_screener_to_notify = screener - break - elif stage == 2: - # Stage 2 passed -> check if we should prune immediately - combined_screener_score, score_error = await Screener.get_combined_screener_score(conn, self.version_id) - # ^ if this is None, we should likely not be here, because it means that either there was no screener 1 or screener 2 evaluation, but in which case how would be here anyway? - if score_error: - await send_slack_message(f"Stage 2 screener score error for version {self.version_id}: {score_error}") - top_agent = await MinerAgentScored.get_top_agent(conn) - - if top_agent and combined_screener_score is not None and (top_agent.avg_score - combined_screener_score) > PRUNE_THRESHOLD: - # Score is too low, prune miner agent and don't create evaluations - await conn.execute("UPDATE miner_agents SET status = 'pruned' WHERE version_id = $1", self.version_id) - logger.info(f"Pruned agent {self.version_id} immediately after screener-2 with combined score {combined_screener_score:.3f} (threshold: {top_agent.avg_score - PRUNE_THRESHOLD:.3f})") - return { - "stage2_screener": None, - "validators": [] - } - - # Score is acceptable -> notify validators - from api.src.models.validator import Validator - - # Create evaluation records but don't notify yet - import random - all_validators = await Validator.get_connected() - validators_to_notify = random.sample(all_validators, min(2, len(all_validators))) - for validator in validators_to_notify: - if (combined_screener_score is None): - await send_slack_message(f"111 Screener score is None when creating evaluation for validator {validator.hotkey}, version {self.version_id}") - await send_slack_message(f"Evaluation object: {str(self)}") - await self.create_for_validator(conn, self.version_id, validator.hotkey, combined_screener_score) + # If it's a screener, handle stage-specific logic + if self.is_screening: + stage = self.screener_stage + threshold = SCREENING_1_THRESHOLD if stage == 1 else SCREENING_2_THRESHOLD + if self.score < threshold: + logger.info(f"Stage {stage} screening failed for agent {self.version_id} with score {self.score} (threshold: {threshold})") + else: + logger.info(f"Stage {stage} screening passed for agent {self.version_id} with score {self.score} (threshold: {threshold})") - # Prune low-scoring evaluations after creating validator evaluations - await Evaluation.prune_low_waiting(conn) + if stage == 1: + # Stage 1 passed -> find ONE available stage 2 screener + from api.src.socket.websocket_manager import WebSocketManager + ws_manager = WebSocketManager.get_instance() + for client in ws_manager.clients.values(): + if client.get_type() != "screener": + continue + screener: Screener = client + + if screener.stage == 2 and screener.is_available(): + stage2_screener_to_notify = screener + break + elif stage == 2: + # Stage 2 passed -> check if we should prune immediately + combined_screener_score, score_error = await Screener.get_combined_screener_score(conn, self.version_id) + # ^ if this is None, we should likely not be here, because it means that either there was no screener 1 or screener 2 evaluation, but in which case how would be here anyway? + if score_error: + await send_slack_message(f"Stage 2 screener score error for version {self.version_id}: {score_error}") + top_agent = await MinerAgentScored.get_top_agent(conn) + + if top_agent and combined_screener_score is not None and (top_agent.avg_score - combined_screener_score) > PRUNE_THRESHOLD: + # Score is too low, prune miner agent and don't create evaluations + await conn.execute("UPDATE miner_agents SET status = 'pruned' WHERE version_id = $1", self.version_id) + logger.info(f"Pruned agent {self.version_id} immediately after screener-2 with combined score {combined_screener_score:.3f} (threshold: {top_agent.avg_score - PRUNE_THRESHOLD:.3f})") + return { + "stage2_screener": None, + "validators": [] + } + + # Score is acceptable -> notify validators + from api.src.models.validator import Validator + + # Create evaluation records but don't notify yet + import random + all_validators = await Validator.get_connected() + validators_to_notify = random.sample(all_validators, min(2, len(all_validators))) + for validator in validators_to_notify: + if (combined_screener_score is None): + await send_slack_message(f"111 Screener score is None when creating evaluation for validator {validator.hotkey}, version {self.version_id}") + await send_slack_message(f"Evaluation object: {str(self)}") + await self.create_for_validator(conn, self.version_id, validator.hotkey, combined_screener_score) + + # Prune low-scoring evaluations after creating validator evaluations + await Evaluation.prune_low_waiting(conn) - await self._update_agent_status(conn) + await self._update_agent_status(conn) - # Return notification targets to be handled OUTSIDE this transaction - return { - "stage2_screener": stage2_screener_to_notify, - "validators": validators_to_notify - } + # Return notification targets to be handled OUTSIDE this transaction + return { + "stage2_screener": stage2_screener_to_notify, + "validators": validators_to_notify + } async def _check_inference_success_rate(self, conn: asyncpg.Connection) -> Tuple[int, int, float, bool]: """Check inference success rate for this evaluation @@ -769,21 +771,22 @@ async def handle_validator_disconnection(validator_hotkey: str): @staticmethod async def handle_screener_disconnection(screener_hotkey: str): """Atomically handle screener disconnection: error active evaluations and reset agents""" - async with get_transaction() as conn: - # Get active screening evaluations for all screener types - active_screenings = await conn.fetch( - """ - SELECT evaluation_id, version_id FROM evaluations - WHERE validator_hotkey = $1 AND status IN ('running', 'waiting') - AND (validator_hotkey LIKE 'screener-%' OR validator_hotkey LIKE 'i-0%') - """, - screener_hotkey, - ) + async with Evaluation.get_lock(): + async with get_transaction() as conn: + # Get active screening evaluations for all screener types + active_screenings = await conn.fetch( + """ + SELECT evaluation_id, version_id FROM evaluations + WHERE validator_hotkey = $1 AND status IN ('running', 'waiting') + AND (validator_hotkey LIKE 'screener-%' OR validator_hotkey LIKE 'i-0%') + """, + screener_hotkey, + ) - for screening_row in active_screenings: - evaluation = await Evaluation.get_by_id(screening_row["evaluation_id"]) - if evaluation: - await evaluation.error(conn, "Disconnected from screener (error code 1)") + for screening_row in active_screenings: + evaluation = await Evaluation.get_by_id(screening_row["evaluation_id"]) + if evaluation: + await evaluation.error(conn, "Disconnected from screener (error code 1)") @staticmethod async def startup_recovery(): diff --git a/api/src/models/screener.py b/api/src/models/screener.py index 6de54e60..91fa54f1 100644 --- a/api/src/models/screener.py +++ b/api/src/models/screener.py @@ -247,30 +247,31 @@ async def finish_screening(self, evaluation_id: str, errored: bool = False, reas logger.warning(f"Screener {self.hotkey}: Invalid finish_screening call for evaluation {evaluation_id}") return - async with get_transaction() as conn: - agent_status = await conn.fetchval("SELECT status FROM miner_agents WHERE version_id = $1", evaluation.version_id) - expected_status = getattr(AgentStatus, f"screening_{self.stage}") - if AgentStatus.from_string(agent_status) != expected_status: - logger.warning(f"Stage {self.stage} screener {self.hotkey}: Evaluation {evaluation_id}: Agent {evaluation.version_id} not in screening_{self.stage} status during finish (current: {agent_status})") - # Clearly a bug here, its somehow set to failed_screening_1 when we hit this if statement - # It should be screening_1, no idea whats setting it to failed_screening_1 - # return - - if errored: - logger.info(f"Screener {self.hotkey}: Finishing screening {evaluation_id}: Errored with reason: {reason}") - await evaluation.error(conn, reason) - logger.info(f"Screener {self.hotkey}: Finishing screening {evaluation_id}: Errored with reason: {reason}: done") - notification_targets = None - else: - notification_targets = await evaluation.finish(conn) + async with Evaluation.get_lock(): + async with get_transaction() as conn: + agent_status = await conn.fetchval("SELECT status FROM miner_agents WHERE version_id = $1", evaluation.version_id) + expected_status = getattr(AgentStatus, f"screening_{self.stage}") + if AgentStatus.from_string(agent_status) != expected_status: + logger.warning(f"Stage {self.stage} screener {self.hotkey}: Evaluation {evaluation_id}: Agent {evaluation.version_id} not in screening_{self.stage} status during finish (current: {agent_status})") + # Clearly a bug here, its somehow set to failed_screening_1 when we hit this if statement + # It should be screening_1, no idea whats setting it to failed_screening_1 + # return + + if errored: + logger.info(f"Screener {self.hotkey}: Finishing screening {evaluation_id}: Errored with reason: {reason}") + await evaluation.error(conn, reason) + logger.info(f"Screener {self.hotkey}: Finishing screening {evaluation_id}: Errored with reason: {reason}: done") + notification_targets = None + else: + notification_targets = await evaluation.finish(conn) - from api.src.socket.websocket_manager import WebSocketManager - ws_manager = WebSocketManager.get_instance() - await ws_manager.send_to_all_non_validators("evaluation-finished", {"evaluation_id": evaluation_id}) + from api.src.socket.websocket_manager import WebSocketManager + ws_manager = WebSocketManager.get_instance() + await ws_manager.send_to_all_non_validators("evaluation-finished", {"evaluation_id": evaluation_id}) - self.set_available() - - logger.info(f"Screener {self.hotkey}: Successfully finished evaluation {evaluation_id}, errored={errored}") + self.set_available() + + logger.info(f"Screener {self.hotkey}: Successfully finished evaluation {evaluation_id}, errored={errored}") # Handle notifications AFTER transaction commits if notification_targets: From 6ba640919c6d51a2bcbff8b493f318849b437a60 Mon Sep 17 00:00:00 2001 From: Faizan Naseer Date: Tue, 7 Oct 2025 18:18:33 -0400 Subject: [PATCH 2/3] added locks to disconnect & finish functions + safety checks + remove duplicate agent status changes + remove mismatch disconnect due to internal states + remove unused db query --- api/src/backend/queries/evaluations.py | 12 +- api/src/backend/queries/evaluations.pyi | 3 +- api/src/endpoints/upload.py | 17 - api/src/models/evaluation.py | 333 ++++++++++---------- api/src/models/screener.py | 71 +++-- api/src/models/validator.py | 73 +++-- api/src/socket/handlers/handle_heartbeat.py | 14 +- 7 files changed, 253 insertions(+), 270 deletions(-) diff --git a/api/src/backend/queries/evaluations.py b/api/src/backend/queries/evaluations.py index 0d740df8..061fdcd0 100644 --- a/api/src/backend/queries/evaluations.py +++ b/api/src/backend/queries/evaluations.py @@ -439,14 +439,4 @@ async def get_miner_hotkey_from_version_id(conn: asyncpg.Connection, version_id: SELECT miner_hotkey FROM miner_agents WHERE version_id = $1 - """, version_id) - -@db_operation -async def is_screener_running_evaluation(conn: asyncpg.Connection, validator_hotkey: str) -> bool: - """Check if a screener is running an evaluation""" - return await conn.fetchval( - """ - SELECT EXISTS(SELECT 1 FROM evaluations WHERE validator_hotkey = $1 AND status = 'running') - """, - validator_hotkey - ) \ No newline at end of file + """, version_id) \ No newline at end of file diff --git a/api/src/backend/queries/evaluations.pyi b/api/src/backend/queries/evaluations.pyi index 97d2075d..30d42c99 100644 --- a/api/src/backend/queries/evaluations.pyi +++ b/api/src/backend/queries/evaluations.pyi @@ -11,5 +11,4 @@ async def get_running_evaluation_by_miner_hotkey(miner_hotkey: str) -> Optional[ async def does_validator_have_running_evaluation(validator_hotkey: str) -> bool: ... async def get_queue_info(validator_hotkey: str, length: int = 10) -> List[Evaluation]: ... async def get_agent_name_from_version_id(version_id: str) -> Optional[str]: ... -async def get_miner_hotkey_from_version_id(version_id: str) -> Optional[str]: ... -async def is_screener_running_evaluation(validator_hotkey: str) -> bool: ... \ No newline at end of file +async def get_miner_hotkey_from_version_id(version_id: str) -> Optional[str]: ... \ No newline at end of file diff --git a/api/src/endpoints/upload.py b/api/src/endpoints/upload.py index 9edb0686..24a7fb03 100644 --- a/api/src/endpoints/upload.py +++ b/api/src/endpoints/upload.py @@ -112,16 +112,6 @@ async def post_agent( detail="No stage 1 screeners available for agent evaluation. Please try again later." ) - # Check is there is an evaluation in the database with the screener's hotkey that has running status - # This is to prevent the case where a screener.is_available() returns true but the screener is actually running an evaluation - # from api.src.backend.queries.evaluations import is_screener_running_evaluation - # is_screener_running_evaluation = await is_screener_running_evaluation(screener.hotkey) - # if is_screener_running_evaluation: - # logger.error(f"No available stage 1 screener for agent upload from miner {miner_hotkey} - screener {screener.hotkey} said it was available but there is an evaluation in the database with the screener's hotkey that has running status") - # raise HTTPException( - # status_code=409, - # detail="No stage 1 screeners available for agent evaluation. Please try again later." - # ) async with get_transaction() as conn: can_upload = await Evaluation.check_miner_has_no_running_evaluations(conn, miner_hotkey) @@ -160,13 +150,6 @@ async def post_agent( else: logger.warning(f"Failed to assign agent {agent.version_id} to screener - screener is not running") - # CRITICAL FIX: Reset agent status back to awaiting_screening_1 to prevent stuck agents - # The agent was created but never properly assigned to screener - await conn.execute( - "UPDATE miner_agents SET status = 'awaiting_screening_1' WHERE version_id = $1", - agent.version_id - ) - logger.warning(f"Reset agent {agent.version_id} status to awaiting_screening_1 due to failed screener assignment") # Screener state is now committed, lock can be released diff --git a/api/src/models/evaluation.py b/api/src/models/evaluation.py index 185a658b..6514f103 100644 --- a/api/src/models/evaluation.py +++ b/api/src/models/evaluation.py @@ -106,124 +106,138 @@ async def start(self, conn: asyncpg.Connection) -> List[EvaluationRun]: async def finish(self, conn: asyncpg.Connection): """Finish evaluation, but retry if >=50% of inferences failed and any run errored""" + # Check if evaluation is already completed - prevent duplicate finish calls + current_status = await conn.fetchval( + "SELECT status FROM evaluations WHERE evaluation_id = $1", + self.evaluation_id + ) + + if current_status == 'completed': + logger.warning( + f"finish() called on already completed evaluation {self.evaluation_id} " + f"(version: {self.version_id}, validator: {self.validator_hotkey}). " + f"Skipping to prevent agent status overwrites." + ) + return None + # Use the evaluation lock to prevent race conditions with disconnection handling - async with Evaluation.get_lock(): + # async with Evaluation.get_lock(): # DEBUG: Check if this evaluation already has a terminated_reason set - current_terminated_reason = await conn.fetchval( - "SELECT terminated_reason FROM evaluations WHERE evaluation_id = $1", - self.evaluation_id - ) - if current_terminated_reason: - current_status = await conn.fetchval( - "SELECT status FROM evaluations WHERE evaluation_id = $1", - self.evaluation_id - ) - - # Print very noticeable debug information - print("=" * 80) - print("🚨 CRITICAL DEBUG: finish() called on evaluation with existing terminated_reason! 🚨") - print("=" * 80) - print(f"Evaluation ID: {self.evaluation_id}") - print(f"Version ID: {self.version_id}") - print(f"Validator Hotkey: {self.validator_hotkey}") - print(f"Current Status: {current_status}") - print(f"Existing terminated_reason: {current_terminated_reason}") - print(f"Is Screening: {self.is_screening}") - if self.is_screening: - print(f"Screener Stage: {self.screener_stage}") - print(f"Current Time: {datetime.now().isoformat()}") - print() - print("CALL STACK TRACE:") - print("-" * 40) - traceback.print_stack() - print("=" * 80) - - # Also log it for persistent record - logger.error( - f"CRITICAL: finish() called on evaluation {self.evaluation_id} " - f"(version_id: {self.version_id}, validator: {self.validator_hotkey}) " - f"that already has terminated_reason: '{current_terminated_reason}'. " - f"Current status: {current_status}. This will result in inconsistent state!" - ) - - # Check if we should retry due to inference failures - successful, total, success_rate, any_run_errored = await self._check_inference_success_rate(conn) + current_terminated_reason = await conn.fetchval( + "SELECT terminated_reason FROM evaluations WHERE evaluation_id = $1", + self.evaluation_id + ) + if current_terminated_reason: + # current_status = await conn.fetchval( + # "SELECT status FROM evaluations WHERE evaluation_id = $1", + # self.evaluation_id + # ) - # If we have inferences and >=50% failed AND any run errored, retry instead of finishing - if total > 0 and success_rate < 0.5 and any_run_errored: - logger.info(f"Evaluation {self.evaluation_id} completed but {successful}/{total} successful inferences ({success_rate:.1%}) with run errors. Retrying...") - await self.reset_to_waiting(conn) - return + # Print very noticeable debug information + print("=" * 80) + print("🚨 CRITICAL DEBUG: finish() called on evaluation with existing terminated_reason! 🚨") + print("=" * 80) + print(f"Evaluation ID: {self.evaluation_id}") + print(f"Version ID: {self.version_id}") + print(f"Validator Hotkey: {self.validator_hotkey}") + print(f"Current Status: {current_status}") + print(f"Existing terminated_reason: {current_terminated_reason}") + print(f"Is Screening: {self.is_screening}") + if self.is_screening: + print(f"Screener Stage: {self.screener_stage}") + print(f"Current Time: {datetime.now().isoformat()}") + print() + print("CALL STACK TRACE:") + print("-" * 40) + traceback.print_stack() + print("=" * 80) - await conn.execute("UPDATE evaluations SET status = 'completed', finished_at = NOW() WHERE evaluation_id = $1", self.evaluation_id) - self.status = EvaluationStatus.completed - self.score = await conn.fetchval("SELECT score FROM evaluations WHERE evaluation_id = $1", self.evaluation_id) + # Also log it for persistent record + logger.error( + f"CRITICAL: finish() called on evaluation {self.evaluation_id} " + f"(version_id: {self.version_id}, validator: {self.validator_hotkey}) " + f"that already has terminated_reason: '{current_terminated_reason}'. " + f"Current status: {current_status}. This will result in inconsistent state!" + ) + + # Check if we should retry due to inference failures + successful, total, success_rate, any_run_errored = await self._check_inference_success_rate(conn) + + # If we have inferences and >=50% failed AND any run errored, retry instead of finishing + if total > 0 and success_rate < 0.5 and any_run_errored: + logger.info(f"Evaluation {self.evaluation_id} completed but {successful}/{total} successful inferences ({success_rate:.1%}) with run errors. Retrying...") + await self.reset_to_waiting(conn) + return + + await conn.execute("UPDATE evaluations SET status = 'completed', finished_at = NOW() WHERE evaluation_id = $1", self.evaluation_id) + self.status = EvaluationStatus.completed + self.score = await conn.fetchval("SELECT score FROM evaluations WHERE evaluation_id = $1", self.evaluation_id) - # Store validators to notify after agent status update - validators_to_notify = [] - stage2_screener_to_notify = None + # Store validators to notify after agent status update + validators_to_notify = [] + stage2_screener_to_notify = None - # If it's a screener, handle stage-specific logic - if self.is_screening: - stage = self.screener_stage - threshold = SCREENING_1_THRESHOLD if stage == 1 else SCREENING_2_THRESHOLD - if self.score < threshold: - logger.info(f"Stage {stage} screening failed for agent {self.version_id} with score {self.score} (threshold: {threshold})") - else: - logger.info(f"Stage {stage} screening passed for agent {self.version_id} with score {self.score} (threshold: {threshold})") - - if stage == 1: - # Stage 1 passed -> find ONE available stage 2 screener - from api.src.socket.websocket_manager import WebSocketManager - ws_manager = WebSocketManager.get_instance() - for client in ws_manager.clients.values(): - if client.get_type() != "screener": - continue - screener: Screener = client - - if screener.stage == 2 and screener.is_available(): - stage2_screener_to_notify = screener - break - elif stage == 2: - # Stage 2 passed -> check if we should prune immediately - combined_screener_score, score_error = await Screener.get_combined_screener_score(conn, self.version_id) - # ^ if this is None, we should likely not be here, because it means that either there was no screener 1 or screener 2 evaluation, but in which case how would be here anyway? - if score_error: - await send_slack_message(f"Stage 2 screener score error for version {self.version_id}: {score_error}") - top_agent = await MinerAgentScored.get_top_agent(conn) - - if top_agent and combined_screener_score is not None and (top_agent.avg_score - combined_screener_score) > PRUNE_THRESHOLD: - # Score is too low, prune miner agent and don't create evaluations - await conn.execute("UPDATE miner_agents SET status = 'pruned' WHERE version_id = $1", self.version_id) - logger.info(f"Pruned agent {self.version_id} immediately after screener-2 with combined score {combined_screener_score:.3f} (threshold: {top_agent.avg_score - PRUNE_THRESHOLD:.3f})") - return { - "stage2_screener": None, - "validators": [] - } - - # Score is acceptable -> notify validators - from api.src.models.validator import Validator - - # Create evaluation records but don't notify yet - import random - all_validators = await Validator.get_connected() - validators_to_notify = random.sample(all_validators, min(2, len(all_validators))) - for validator in validators_to_notify: - if (combined_screener_score is None): - await send_slack_message(f"111 Screener score is None when creating evaluation for validator {validator.hotkey}, version {self.version_id}") - await send_slack_message(f"Evaluation object: {str(self)}") - await self.create_for_validator(conn, self.version_id, validator.hotkey, combined_screener_score) + # If it's a screener, handle stage-specific logic + if self.is_screening: + stage = self.screener_stage + threshold = SCREENING_1_THRESHOLD if stage == 1 else SCREENING_2_THRESHOLD + if self.score < threshold: + logger.info(f"Stage {stage} screening failed for agent {self.version_id} with score {self.score} (threshold: {threshold})") + else: + logger.info(f"Stage {stage} screening passed for agent {self.version_id} with score {self.score} (threshold: {threshold})") + + if stage == 1: + # Stage 1 passed -> find ONE available stage 2 screener + from api.src.socket.websocket_manager import WebSocketManager + ws_manager = WebSocketManager.get_instance() + for client in ws_manager.clients.values(): + if client.get_type() != "screener": + continue + screener: Screener = client - # Prune low-scoring evaluations after creating validator evaluations - await Evaluation.prune_low_waiting(conn) + if screener.stage == 2 and screener.is_available(): + stage2_screener_to_notify = screener + break + elif stage == 2: + # Stage 2 passed -> check if we should prune immediately + combined_screener_score, score_error = await Screener.get_combined_screener_score(conn, self.version_id) + # ^ if this is None, we should likely not be here, because it means that either there was no screener 1 or screener 2 evaluation, but in which case how would be here anyway? + if score_error: + await send_slack_message(f"Stage 2 screener score error for version {self.version_id}: {score_error}") + top_agent = await MinerAgentScored.get_top_agent(conn) + + if top_agent and combined_screener_score is not None and (top_agent.avg_score - combined_screener_score) > PRUNE_THRESHOLD: + # Score is too low, prune miner agent and don't create evaluations + await conn.execute("UPDATE miner_agents SET status = 'pruned' WHERE version_id = $1", self.version_id) + logger.info(f"Pruned agent {self.version_id} immediately after screener-2 with combined score {combined_screener_score:.3f} (threshold: {top_agent.avg_score - PRUNE_THRESHOLD:.3f})") + return { + "stage2_screener": None, + "validators": [] + } + + # Score is acceptable -> notify validators + from api.src.models.validator import Validator + + # Create evaluation records but don't notify yet + import random + all_validators = await Validator.get_connected() + validators_to_notify = random.sample(all_validators, min(2, len(all_validators))) + for validator in validators_to_notify: + if (combined_screener_score is None): + await send_slack_message(f"111 Screener score is None when creating evaluation for validator {validator.hotkey}, version {self.version_id}") + await send_slack_message(f"Evaluation object: {str(self)}") + await self.create_for_validator(conn, self.version_id, validator.hotkey, combined_screener_score) + + # Prune low-scoring evaluations after creating validator evaluations + await Evaluation.prune_low_waiting(conn) - await self._update_agent_status(conn) + await self._update_agent_status(conn) - # Return notification targets to be handled OUTSIDE this transaction - return { - "stage2_screener": stage2_screener_to_notify, - "validators": validators_to_notify - } + # Return notification targets to be handled OUTSIDE this transaction + return { + "stage2_screener": stage2_screener_to_notify, + "validators": validators_to_notify + } async def _check_inference_success_rate(self, conn: asyncpg.Connection) -> Tuple[int, int, float, bool]: """Check inference success rate for this evaluation @@ -486,19 +500,25 @@ async def create_screening_and_send(conn: asyncpg.Connection, agent: 'MinerAgent """Create screening evaluation""" from api.src.socket.websocket_manager import WebSocketManager - # Safety check: Ensure screener doesn't already have a running evaluation - existing_evaluation = await conn.fetchrow( - """ - SELECT evaluation_id, status FROM evaluations - WHERE validator_hotkey = $1 AND status = 'running' - LIMIT 1 - """, - screener.hotkey - ) + # # Additional safety check: Ensure this agent doesn't already have a running screening at the same stage (lowk useless) + # screener_stage = screener.stage + # agent_running_screening = await conn.fetchval( + # """ + # SELECT COUNT(*) FROM evaluations e + # JOIN miner_agents ma ON e.version_id = ma.version_id + # WHERE ma.version_id = $1 + # AND ( + # (e.validator_hotkey LIKE 'screener-1-%' OR e.validator_hotkey LIKE 'i-0%') + # OR e.validator_hotkey LIKE 'screener-2-%' + # ) + # AND e.status = 'running' + # """, + # agent.version_id + # ) - if existing_evaluation: - logger.error(f"CRITICAL: Screener {screener.hotkey} already has running evaluation {existing_evaluation['evaluation_id']} - refusing to create duplicate screening") - return "", False + # if agent_running_screening > 0: + # logger.error(f"CRITICAL: Agent {agent.version_id} already has running screening - refusing to create duplicate screening") + # return "", False ws = WebSocketManager.get_instance() @@ -577,7 +597,7 @@ async def screen_next_awaiting_agent(screener: "Screener"): # Log the agents for debugging awaiting_agents = await conn.fetch( """ - SELECT version_id, miner_hotkey, agent_name, created_at FROM miner_agents + SELECT version_id, miner_hotkey, agent_name, created_at, version_num, created_at FROM miner_agents WHERE status = $1 AND miner_hotkey NOT IN (SELECT miner_hotkey from banned_hotkeys) ORDER BY created_at ASC @@ -587,45 +607,20 @@ async def screen_next_awaiting_agent(screener: "Screener"): for agent in awaiting_agents[:3]: # Log first 3 logger.info(f"Awaiting stage {screener.stage} agent: {agent['agent_name']} ({agent['version_id']}) from {agent['miner_hotkey']}") - # Atomically claim the next awaiting agent for this stage using CTE with FOR UPDATE SKIP LOCKED - logger.debug(f"Stage {screener.stage} screener {screener.hotkey} attempting to claim agent with status '{target_status}'") - try: - claimed_agent = await conn.fetchrow( - """ - WITH next_agent AS ( - SELECT version_id FROM miner_agents - WHERE status = $1 - AND miner_hotkey NOT IN (SELECT miner_hotkey from banned_hotkeys) - ORDER BY created_at ASC - FOR UPDATE SKIP LOCKED - LIMIT 1 - ) - UPDATE miner_agents - SET status = $2 - FROM next_agent - WHERE miner_agents.version_id = next_agent.version_id - RETURNING miner_agents.version_id, miner_hotkey, agent_name, version_num, created_at - """, - target_status, - target_screening_status - ) - except Exception as e: - logger.warning(f"Database error while claiming agent for screener {screener.hotkey}: {e}") - claimed_agent = None - if not claimed_agent: + else: screener.set_available() # Ensure available state is set logger.info(f"No stage {screener.stage} agents claimed by screener {screener.hotkey} despite {awaiting_count} awaiting") return - logger.info(f"Stage {screener.stage} screener {screener.hotkey} claimed agent {claimed_agent['agent_name']} ({claimed_agent['version_id']})") + logger.info(f"Stage {screener.stage} screener {screener.hotkey} claimed agent {awaiting_agents[0]['agent_name']} ({awaiting_agents[0]['version_id']})") agent = MinerAgent( - version_id=claimed_agent["version_id"], - miner_hotkey=claimed_agent["miner_hotkey"], - agent_name=claimed_agent["agent_name"], - version_num=claimed_agent["version_num"], - created_at=claimed_agent["created_at"], + version_id=awaiting_agents[0]["version_id"], + miner_hotkey=awaiting_agents[0]["miner_hotkey"], + agent_name=awaiting_agents[0]["agent_name"], + version_num=awaiting_agents[0]["version_num"], + created_at=awaiting_agents[0]["created_at"], status=target_screening_status, # Already set to correct status in query ) @@ -771,22 +766,22 @@ async def handle_validator_disconnection(validator_hotkey: str): @staticmethod async def handle_screener_disconnection(screener_hotkey: str): """Atomically handle screener disconnection: error active evaluations and reset agents""" - async with Evaluation.get_lock(): - async with get_transaction() as conn: - # Get active screening evaluations for all screener types - active_screenings = await conn.fetch( - """ - SELECT evaluation_id, version_id FROM evaluations - WHERE validator_hotkey = $1 AND status IN ('running', 'waiting') - AND (validator_hotkey LIKE 'screener-%' OR validator_hotkey LIKE 'i-0%') - """, - screener_hotkey, - ) + # async with Evaluation.get_lock(): + async with get_transaction() as conn: + # Get active screening evaluations for all screener types + active_screenings = await conn.fetch( + """ + SELECT evaluation_id, version_id FROM evaluations + WHERE validator_hotkey = $1 AND status IN ('running', 'waiting') + AND (validator_hotkey LIKE 'screener-%' OR validator_hotkey LIKE 'i-0%') + """, + screener_hotkey, + ) - for screening_row in active_screenings: - evaluation = await Evaluation.get_by_id(screening_row["evaluation_id"]) - if evaluation: - await evaluation.error(conn, "Disconnected from screener (error code 1)") + for screening_row in active_screenings: + evaluation = await Evaluation.get_by_id(screening_row["evaluation_id"]) + if evaluation: + await evaluation.error(conn, "Disconnected from screener (error code 1)") @staticmethod async def startup_recovery(): diff --git a/api/src/models/screener.py b/api/src/models/screener.py index 91fa54f1..48a92bca 100644 --- a/api/src/models/screener.py +++ b/api/src/models/screener.py @@ -231,9 +231,10 @@ async def disconnect(self): """Handle screener disconnection""" from api.src.models.evaluation import Evaluation # Explicitly reset status on disconnect to ensure clean state - self.set_available() - logger.info(f"Screener {self.hotkey} disconnected, status reset to: {self.status}") - await Evaluation.handle_screener_disconnection(self.hotkey) + async with Evaluation.get_lock(): + self.set_available() + logger.info(f"Screener {self.hotkey} disconnected, status reset to: {self.status}") + await Evaluation.handle_screener_disconnection(self.hotkey) async def finish_screening(self, evaluation_id: str, errored: bool = False, reason: Optional[str] = None): """Finish screening evaluation""" @@ -241,13 +242,13 @@ async def finish_screening(self, evaluation_id: str, errored: bool = False, reas logger.info(f"Screener {self.hotkey}: Finishing screening {evaluation_id}, entered finish_screening") - try: - evaluation = await Evaluation.get_by_id(evaluation_id) - if not evaluation or not evaluation.is_screening or evaluation.validator_hotkey != self.hotkey: - logger.warning(f"Screener {self.hotkey}: Invalid finish_screening call for evaluation {evaluation_id}") - return - - async with Evaluation.get_lock(): + async with Evaluation.get_lock(): # SINGLE LOCK FOR ENTIRE FUNCTION + try: + evaluation = await Evaluation.get_by_id(evaluation_id) + if not evaluation or not evaluation.is_screening or evaluation.validator_hotkey != self.hotkey: + logger.warning(f"Screener {self.hotkey}: Invalid finish_screening call for evaluation {evaluation_id}") + return + async with get_transaction() as conn: agent_status = await conn.fetchval("SELECT status FROM miner_agents WHERE version_id = $1", evaluation.version_id) expected_status = getattr(AgentStatus, f"screening_{self.stage}") @@ -272,17 +273,15 @@ async def finish_screening(self, evaluation_id: str, errored: bool = False, reas self.set_available() logger.info(f"Screener {self.hotkey}: Successfully finished evaluation {evaluation_id}, errored={errored}") - - # Handle notifications AFTER transaction commits - if notification_targets: - # Notify stage 2 screener when stage 1 completes - if notification_targets.get("stage2_screener"): - async with Evaluation.get_lock(): - await Evaluation.screen_next_awaiting_agent(notification_targets["stage2_screener"]) - # Notify validators with proper lock protection - for validator in notification_targets.get("validators", []): - async with Evaluation.get_lock(): + # Handle notifications AFTER transaction commits + if notification_targets: + # Notify stage 2 screener when stage 1 completes + if notification_targets.get("stage2_screener"): + await Evaluation.screen_next_awaiting_agent(notification_targets["stage2_screener"]) + + # Notify validators + for validator in notification_targets.get("validators", []): if validator.is_available(): success = await validator.start_evaluation_and_send(evaluation_id) if success: @@ -291,16 +290,18 @@ async def finish_screening(self, evaluation_id: str, errored: bool = False, reas logger.warning(f"Failed to assign evaluation {evaluation_id} to validator {validator.hotkey}") else: logger.info(f"Validator {validator.hotkey} not available for evaluation {evaluation_id}") - - logger.info(f"Screener {self.hotkey}: Finishing screening {evaluation_id}: Got to end of try block") - finally: - logger.info(f"Screener {self.hotkey}: Finishing screening {evaluation_id}, in finally block") - # Single atomic reset and reassignment - async with Evaluation.get_lock(): + self.set_available() logger.info(f"Screener {self.hotkey}: Reset to available and looking for next agent") await Evaluation.screen_next_awaiting_agent(self) - logger.info(f"Screener {self.hotkey}: Finishing screening {evaluation_id}, exiting finally block") + logger.info(f"Screener {self.hotkey}: Finishing screening {evaluation_id}, completed successfully") + + except Exception as e: + logger.error(f"Screener {self.hotkey}: Error in finish_screening: {e}") + # Ensure screener is reset even on error + self.set_available() + await Evaluation.screen_next_awaiting_agent(self) + logger.info(f"Screener {self.hotkey}: Reset to available after error in finish_screening") @staticmethod async def get_first_available() -> Optional['Screener']: @@ -319,6 +320,8 @@ async def get_first_available() -> Optional['Screener']: async def get_first_available_and_reserve(stage: int) -> Optional['Screener']: """Atomically find and reserve first available screener for specific stage - MUST be called within Evaluation lock""" from api.src.socket.websocket_manager import WebSocketManager + from api.src.backend.db_manager import get_db_connection + ws_manager = WebSocketManager.get_instance() for client in ws_manager.clients.values(): @@ -327,6 +330,20 @@ async def get_first_available_and_reserve(stage: int) -> Optional['Screener']: client.is_available() and client.stage == stage): + # Double-check against database to ensure screener is truly available + async with get_db_connection() as db_conn: + has_running_evaluation = await db_conn.fetchval( + """ + SELECT EXISTS(SELECT 1 FROM evaluations + WHERE validator_hotkey = $1 AND status = 'running') + """, + client.hotkey + ) + + if has_running_evaluation: + logger.warning(f"Screener {client.hotkey} appears available in memory but has running evaluation in database - skipping") + continue + # Immediately reserve to prevent race conditions client.status = "reserving" logger.info(f"Reserved stage {stage} screener {client.hotkey} for work assignment") diff --git a/api/src/models/validator.py b/api/src/models/validator.py index ed1be1df..b5da2755 100644 --- a/api/src/models/validator.py +++ b/api/src/models/validator.py @@ -149,7 +149,8 @@ async def connect(self): async def disconnect(self): """Handle validator disconnection""" from api.src.models.evaluation import Evaluation - await Evaluation.handle_validator_disconnection(self.hotkey) + async with Evaluation.get_lock(): + await Evaluation.handle_validator_disconnection(self.hotkey) async def get_next_evaluation(self) -> Optional[str]: """Get next evaluation ID for this validator""" @@ -167,44 +168,42 @@ async def get_next_evaluation(self) -> Optional[str]: async def finish_evaluation(self, evaluation_id: str, errored: bool = False, reason: Optional[str] = None): """Finish evaluation and automatically look for next work""" from api.src.models.evaluation import Evaluation - - try: - evaluation = await Evaluation.get_by_id(evaluation_id) - if not evaluation or evaluation.validator_hotkey != self.hotkey: - logger.warning(f"Validator {self.hotkey}: Invalid finish_evaluation call for evaluation {evaluation_id}") - return - - async with get_transaction() as conn: - agent_status = await conn.fetchval("SELECT status FROM miner_agents WHERE version_id = $1", evaluation.version_id) - if AgentStatus.from_string(agent_status) != AgentStatus.evaluating: - logger.warning(f"Validator {self.hotkey}: Agent {evaluation.version_id} not in evaluating status during finish") + async with Evaluation.get_lock(): + try: + evaluation = await Evaluation.get_by_id(evaluation_id) + if not evaluation or evaluation.validator_hotkey != self.hotkey: + logger.warning(f"Validator {self.hotkey}: Invalid finish_evaluation call for evaluation {evaluation_id}") return - if errored: - await evaluation.error(conn, reason) - notification_targets = None - else: - notification_targets = await evaluation.finish(conn) - - from api.src.socket.websocket_manager import WebSocketManager - ws_manager = WebSocketManager.get_instance() - await ws_manager.send_to_all_non_validators("evaluation-finished", {"evaluation_id": evaluation_id}) - - logger.info(f"Validator {self.hotkey}: Successfully finished evaluation {evaluation_id}, errored={errored}") - - # Handle notifications AFTER transaction commits - if notification_targets: - # Note: Validators typically don't trigger stage transitions, but handle any notifications - for validator in notification_targets.get("validators", []): - async with Evaluation.get_lock(): - if validator.is_available(): - success = await validator.start_evaluation_and_send(evaluation_id) - if success: - logger.info(f"Successfully assigned evaluation {evaluation_id} to validator {validator.hotkey}") - - finally: - # Single atomic reset and reassignment - async with Evaluation.get_lock(): + async with get_transaction() as conn: + agent_status = await conn.fetchval("SELECT status FROM miner_agents WHERE version_id = $1", evaluation.version_id) + if AgentStatus.from_string(agent_status) != AgentStatus.evaluating: + logger.warning(f"Validator {self.hotkey}: Agent {evaluation.version_id} not in evaluating status during finish") + return + + if errored: + await evaluation.error(conn, reason) + notification_targets = None + else: + notification_targets = await evaluation.finish(conn) + + from api.src.socket.websocket_manager import WebSocketManager + ws_manager = WebSocketManager.get_instance() + await ws_manager.send_to_all_non_validators("evaluation-finished", {"evaluation_id": evaluation_id}) + + logger.info(f"Validator {self.hotkey}: Successfully finished evaluation {evaluation_id}, errored={errored}") + + # Handle notifications AFTER transaction commits + if notification_targets: + # Note: Validators typically don't trigger stage transitions, but handle any notifications + for validator in notification_targets.get("validators", []): + async with Evaluation.get_lock(): + if validator.is_available(): + success = await validator.start_evaluation_and_send(evaluation_id) + if success: + logger.info(f"Successfully assigned evaluation {evaluation_id} to validator {validator.hotkey}") + + finally: self.set_available() logger.info(f"Validator {self.hotkey}: Reset to available and looking for next evaluation") await self._check_and_start_next_evaluation() diff --git a/api/src/socket/handlers/handle_heartbeat.py b/api/src/socket/handlers/handle_heartbeat.py index 1468dc77..58ee9d44 100644 --- a/api/src/socket/handlers/handle_heartbeat.py +++ b/api/src/socket/handlers/handle_heartbeat.py @@ -30,13 +30,13 @@ async def handle_heartbeat( client.status_mismatch_count += 1 logger.warning(f"Client {client.hotkey} status mismatch #{client.status_mismatch_count}: Client says {alleged_status}, but Platform says {client.status}") - if client.status_mismatch_count >= 25: - logger.error(f"Client {client.hotkey} has {client.status_mismatch_count} consecutive status mismatches. Forcing reconnection.") - await websocket.send_json({"event": "error", "error": f"Too many status mismatches ({client.status_mismatch_count}). Reconnecting..."}) - await websocket.close() - raise WebSocketDisconnect() - else: - await websocket.send_json({"event": "error", "error": f"Client status mismatch: Client says {alleged_status}, but Platform says {client.status}"}) + # if client.status_mismatch_count >= 25: + # logger.error(f"Client {client.hotkey} has {client.status_mismatch_count} consecutive status mismatches. Forcing reconnection.") + # await websocket.send_json({"event": "error", "error": f"Too many status mismatches ({client.status_mismatch_count}). Reconnecting..."}) + # await websocket.close() + # raise WebSocketDisconnect() + # else: + # await websocket.send_json({"event": "error", "error": f"Client status mismatch: Client says {alleged_status}, but Platform says {client.status}"}) else: # Reset counter on successful status match client.status_mismatch_count = 0 From e434bee0aa9c5c7071f9f54de7567232d59ce5da Mon Sep 17 00:00:00 2001 From: Faizan Naseer Date: Tue, 7 Oct 2025 18:26:59 -0400 Subject: [PATCH 3/3] changed mismatch limit to 100 --- api/src/socket/handlers/handle_heartbeat.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/api/src/socket/handlers/handle_heartbeat.py b/api/src/socket/handlers/handle_heartbeat.py index 58ee9d44..e7f7738b 100644 --- a/api/src/socket/handlers/handle_heartbeat.py +++ b/api/src/socket/handlers/handle_heartbeat.py @@ -30,13 +30,13 @@ async def handle_heartbeat( client.status_mismatch_count += 1 logger.warning(f"Client {client.hotkey} status mismatch #{client.status_mismatch_count}: Client says {alleged_status}, but Platform says {client.status}") - # if client.status_mismatch_count >= 25: - # logger.error(f"Client {client.hotkey} has {client.status_mismatch_count} consecutive status mismatches. Forcing reconnection.") - # await websocket.send_json({"event": "error", "error": f"Too many status mismatches ({client.status_mismatch_count}). Reconnecting..."}) - # await websocket.close() - # raise WebSocketDisconnect() - # else: - # await websocket.send_json({"event": "error", "error": f"Client status mismatch: Client says {alleged_status}, but Platform says {client.status}"}) + if client.status_mismatch_count >= 100: + logger.error(f"Client {client.hotkey} has {client.status_mismatch_count} consecutive status mismatches. Forcing reconnection.") + await websocket.send_json({"event": "error", "error": f"Too many status mismatches ({client.status_mismatch_count}). Reconnecting..."}) + await websocket.close() + raise WebSocketDisconnect() + else: + await websocket.send_json({"event": "error", "error": f"Client status mismatch: Client says {alleged_status}, but Platform says {client.status}"}) else: # Reset counter on successful status match client.status_mismatch_count = 0