Skip to content

Commit 768fc4c

Browse files
authored
Merge pull request #161 from ridgesai/fix-disconnect-race-condition
Fix disconnect race condition + awaiting_sceening_1 issues
2 parents 385f9a1 + e434bee commit 768fc4c

File tree

7 files changed

+153
-158
lines changed

7 files changed

+153
-158
lines changed

api/src/backend/queries/evaluations.py

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -439,14 +439,4 @@ async def get_miner_hotkey_from_version_id(conn: asyncpg.Connection, version_id:
439439
SELECT miner_hotkey
440440
FROM miner_agents
441441
WHERE version_id = $1
442-
""", version_id)
443-
444-
@db_operation
445-
async def is_screener_running_evaluation(conn: asyncpg.Connection, validator_hotkey: str) -> bool:
446-
"""Check if a screener is running an evaluation"""
447-
return await conn.fetchval(
448-
"""
449-
SELECT EXISTS(SELECT 1 FROM evaluations WHERE validator_hotkey = $1 AND status = 'running')
450-
""",
451-
validator_hotkey
452-
)
442+
""", version_id)

api/src/backend/queries/evaluations.pyi

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,4 @@ async def get_running_evaluation_by_miner_hotkey(miner_hotkey: str) -> Optional[
1111
async def does_validator_have_running_evaluation(validator_hotkey: str) -> bool: ...
1212
async def get_queue_info(validator_hotkey: str, length: int = 10) -> List[Evaluation]: ...
1313
async def get_agent_name_from_version_id(version_id: str) -> Optional[str]: ...
14-
async def get_miner_hotkey_from_version_id(version_id: str) -> Optional[str]: ...
15-
async def is_screener_running_evaluation(validator_hotkey: str) -> bool: ...
14+
async def get_miner_hotkey_from_version_id(version_id: str) -> Optional[str]: ...

api/src/endpoints/upload.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -112,16 +112,6 @@ async def post_agent(
112112
detail="No stage 1 screeners available for agent evaluation. Please try again later."
113113
)
114114

115-
# Check is there is an evaluation in the database with the screener's hotkey that has running status
116-
# This is to prevent the case where a screener.is_available() returns true but the screener is actually running an evaluation
117-
# from api.src.backend.queries.evaluations import is_screener_running_evaluation
118-
# is_screener_running_evaluation = await is_screener_running_evaluation(screener.hotkey)
119-
# if is_screener_running_evaluation:
120-
# logger.error(f"No available stage 1 screener for agent upload from miner {miner_hotkey} - screener {screener.hotkey} said it was available but there is an evaluation in the database with the screener's hotkey that has running status")
121-
# raise HTTPException(
122-
# status_code=409,
123-
# detail="No stage 1 screeners available for agent evaluation. Please try again later."
124-
# )
125115

126116
async with get_transaction() as conn:
127117
can_upload = await Evaluation.check_miner_has_no_running_evaluations(conn, miner_hotkey)
@@ -159,6 +149,7 @@ async def post_agent(
159149
logger.warning(f"Failed to assign agent {agent.version_id} to screener")
160150
else:
161151
logger.warning(f"Failed to assign agent {agent.version_id} to screener - screener is not running")
152+
162153

163154
# Screener state is now committed, lock can be released
164155

api/src/models/evaluation.py

Lines changed: 48 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -106,16 +106,32 @@ async def start(self, conn: asyncpg.Connection) -> List[EvaluationRun]:
106106
async def finish(self, conn: asyncpg.Connection):
107107
"""Finish evaluation, but retry if >=50% of inferences failed and any run errored"""
108108

109-
# DEBUG: Check if this evaluation already has a terminated_reason set
109+
# Check if evaluation is already completed - prevent duplicate finish calls
110+
current_status = await conn.fetchval(
111+
"SELECT status FROM evaluations WHERE evaluation_id = $1",
112+
self.evaluation_id
113+
)
114+
115+
if current_status == 'completed':
116+
logger.warning(
117+
f"finish() called on already completed evaluation {self.evaluation_id} "
118+
f"(version: {self.version_id}, validator: {self.validator_hotkey}). "
119+
f"Skipping to prevent agent status overwrites."
120+
)
121+
return None
122+
123+
# Use the evaluation lock to prevent race conditions with disconnection handling
124+
# async with Evaluation.get_lock():
125+
# DEBUG: Check if this evaluation already has a terminated_reason set
110126
current_terminated_reason = await conn.fetchval(
111127
"SELECT terminated_reason FROM evaluations WHERE evaluation_id = $1",
112128
self.evaluation_id
113129
)
114130
if current_terminated_reason:
115-
current_status = await conn.fetchval(
116-
"SELECT status FROM evaluations WHERE evaluation_id = $1",
117-
self.evaluation_id
118-
)
131+
# current_status = await conn.fetchval(
132+
# "SELECT status FROM evaluations WHERE evaluation_id = $1",
133+
# self.evaluation_id
134+
# )
119135

120136
# Print very noticeable debug information
121137
print("=" * 80)
@@ -484,19 +500,25 @@ async def create_screening_and_send(conn: asyncpg.Connection, agent: 'MinerAgent
484500
"""Create screening evaluation"""
485501
from api.src.socket.websocket_manager import WebSocketManager
486502

487-
# Safety check: Ensure screener doesn't already have a running evaluation
488-
existing_evaluation = await conn.fetchrow(
489-
"""
490-
SELECT evaluation_id, status FROM evaluations
491-
WHERE validator_hotkey = $1 AND status = 'running'
492-
LIMIT 1
493-
""",
494-
screener.hotkey
495-
)
503+
# # Additional safety check: Ensure this agent doesn't already have a running screening at the same stage (lowk useless)
504+
# screener_stage = screener.stage
505+
# agent_running_screening = await conn.fetchval(
506+
# """
507+
# SELECT COUNT(*) FROM evaluations e
508+
# JOIN miner_agents ma ON e.version_id = ma.version_id
509+
# WHERE ma.version_id = $1
510+
# AND (
511+
# (e.validator_hotkey LIKE 'screener-1-%' OR e.validator_hotkey LIKE 'i-0%')
512+
# OR e.validator_hotkey LIKE 'screener-2-%'
513+
# )
514+
# AND e.status = 'running'
515+
# """,
516+
# agent.version_id
517+
# )
496518

497-
if existing_evaluation:
498-
logger.error(f"CRITICAL: Screener {screener.hotkey} already has running evaluation {existing_evaluation['evaluation_id']} - refusing to create duplicate screening")
499-
return "", False
519+
# if agent_running_screening > 0:
520+
# logger.error(f"CRITICAL: Agent {agent.version_id} already has running screening - refusing to create duplicate screening")
521+
# return "", False
500522

501523
ws = WebSocketManager.get_instance()
502524

@@ -575,7 +597,7 @@ async def screen_next_awaiting_agent(screener: "Screener"):
575597
# Log the agents for debugging
576598
awaiting_agents = await conn.fetch(
577599
"""
578-
SELECT version_id, miner_hotkey, agent_name, created_at FROM miner_agents
600+
SELECT version_id, miner_hotkey, agent_name, created_at, version_num, created_at FROM miner_agents
579601
WHERE status = $1
580602
AND miner_hotkey NOT IN (SELECT miner_hotkey from banned_hotkeys)
581603
ORDER BY created_at ASC
@@ -585,45 +607,20 @@ async def screen_next_awaiting_agent(screener: "Screener"):
585607
for agent in awaiting_agents[:3]: # Log first 3
586608
logger.info(f"Awaiting stage {screener.stage} agent: {agent['agent_name']} ({agent['version_id']}) from {agent['miner_hotkey']}")
587609

588-
# Atomically claim the next awaiting agent for this stage using CTE with FOR UPDATE SKIP LOCKED
589-
logger.debug(f"Stage {screener.stage} screener {screener.hotkey} attempting to claim agent with status '{target_status}'")
590-
try:
591-
claimed_agent = await conn.fetchrow(
592-
"""
593-
WITH next_agent AS (
594-
SELECT version_id FROM miner_agents
595-
WHERE status = $1
596-
AND miner_hotkey NOT IN (SELECT miner_hotkey from banned_hotkeys)
597-
ORDER BY created_at ASC
598-
FOR UPDATE SKIP LOCKED
599-
LIMIT 1
600-
)
601-
UPDATE miner_agents
602-
SET status = $2
603-
FROM next_agent
604-
WHERE miner_agents.version_id = next_agent.version_id
605-
RETURNING miner_agents.version_id, miner_hotkey, agent_name, version_num, created_at
606-
""",
607-
target_status,
608-
target_screening_status
609-
)
610-
except Exception as e:
611-
logger.warning(f"Database error while claiming agent for screener {screener.hotkey}: {e}")
612-
claimed_agent = None
613610

614-
if not claimed_agent:
611+
else:
615612
screener.set_available() # Ensure available state is set
616613
logger.info(f"No stage {screener.stage} agents claimed by screener {screener.hotkey} despite {awaiting_count} awaiting")
617614
return
618615

619-
logger.info(f"Stage {screener.stage} screener {screener.hotkey} claimed agent {claimed_agent['agent_name']} ({claimed_agent['version_id']})")
616+
logger.info(f"Stage {screener.stage} screener {screener.hotkey} claimed agent {awaiting_agents[0]['agent_name']} ({awaiting_agents[0]['version_id']})")
620617

621618
agent = MinerAgent(
622-
version_id=claimed_agent["version_id"],
623-
miner_hotkey=claimed_agent["miner_hotkey"],
624-
agent_name=claimed_agent["agent_name"],
625-
version_num=claimed_agent["version_num"],
626-
created_at=claimed_agent["created_at"],
619+
version_id=awaiting_agents[0]["version_id"],
620+
miner_hotkey=awaiting_agents[0]["miner_hotkey"],
621+
agent_name=awaiting_agents[0]["agent_name"],
622+
version_num=awaiting_agents[0]["version_num"],
623+
created_at=awaiting_agents[0]["created_at"],
627624
status=target_screening_status, # Already set to correct status in query
628625
)
629626

@@ -769,6 +766,7 @@ async def handle_validator_disconnection(validator_hotkey: str):
769766
@staticmethod
770767
async def handle_screener_disconnection(screener_hotkey: str):
771768
"""Atomically handle screener disconnection: error active evaluations and reset agents"""
769+
# async with Evaluation.get_lock():
772770
async with get_transaction() as conn:
773771
# Get active screening evaluations for all screener types
774772
active_screenings = await conn.fetch(

0 commit comments

Comments
 (0)