Skip to content

Commit 57ff9fe

Browse files
authored
Merge pull request #162 from ridgesai/revert-fix
Revert "Merge pull request #161 from ridgesai/fix-disconnect-race-con…
2 parents 768fc4c + 590cadc commit 57ff9fe

File tree

7 files changed

+158
-153
lines changed

7 files changed

+158
-153
lines changed

api/src/backend/queries/evaluations.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,4 +439,14 @@ async def get_miner_hotkey_from_version_id(conn: asyncpg.Connection, version_id:
439439
SELECT miner_hotkey
440440
FROM miner_agents
441441
WHERE version_id = $1
442-
""", version_id)
442+
""", version_id)
443+
444+
@db_operation
445+
async def is_screener_running_evaluation(conn: asyncpg.Connection, validator_hotkey: str) -> bool:
446+
"""Check if a screener is running an evaluation"""
447+
return await conn.fetchval(
448+
"""
449+
SELECT EXISTS(SELECT 1 FROM evaluations WHERE validator_hotkey = $1 AND status = 'running')
450+
""",
451+
validator_hotkey
452+
)

api/src/backend/queries/evaluations.pyi

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@ async def get_running_evaluation_by_miner_hotkey(miner_hotkey: str) -> Optional[
1111
async def does_validator_have_running_evaluation(validator_hotkey: str) -> bool: ...
1212
async def get_queue_info(validator_hotkey: str, length: int = 10) -> List[Evaluation]: ...
1313
async def get_agent_name_from_version_id(version_id: str) -> Optional[str]: ...
14-
async def get_miner_hotkey_from_version_id(version_id: str) -> Optional[str]: ...
14+
async def get_miner_hotkey_from_version_id(version_id: str) -> Optional[str]: ...
15+
async def is_screener_running_evaluation(validator_hotkey: str) -> bool: ...

api/src/endpoints/upload.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,16 @@ async def post_agent(
112112
detail="No stage 1 screeners available for agent evaluation. Please try again later."
113113
)
114114

115+
# Check is there is an evaluation in the database with the screener's hotkey that has running status
116+
# This is to prevent the case where a screener.is_available() returns true but the screener is actually running an evaluation
117+
# from api.src.backend.queries.evaluations import is_screener_running_evaluation
118+
# is_screener_running_evaluation = await is_screener_running_evaluation(screener.hotkey)
119+
# if is_screener_running_evaluation:
120+
# logger.error(f"No available stage 1 screener for agent upload from miner {miner_hotkey} - screener {screener.hotkey} said it was available but there is an evaluation in the database with the screener's hotkey that has running status")
121+
# raise HTTPException(
122+
# status_code=409,
123+
# detail="No stage 1 screeners available for agent evaluation. Please try again later."
124+
# )
115125

116126
async with get_transaction() as conn:
117127
can_upload = await Evaluation.check_miner_has_no_running_evaluations(conn, miner_hotkey)
@@ -149,7 +159,6 @@ async def post_agent(
149159
logger.warning(f"Failed to assign agent {agent.version_id} to screener")
150160
else:
151161
logger.warning(f"Failed to assign agent {agent.version_id} to screener - screener is not running")
152-
153162

154163
# Screener state is now committed, lock can be released
155164

api/src/models/evaluation.py

Lines changed: 50 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -106,32 +106,16 @@ async def start(self, conn: asyncpg.Connection) -> List[EvaluationRun]:
106106
async def finish(self, conn: asyncpg.Connection):
107107
"""Finish evaluation, but retry if >=50% of inferences failed and any run errored"""
108108

109-
# Check if evaluation is already completed - prevent duplicate finish calls
110-
current_status = await conn.fetchval(
111-
"SELECT status FROM evaluations WHERE evaluation_id = $1",
112-
self.evaluation_id
113-
)
114-
115-
if current_status == 'completed':
116-
logger.warning(
117-
f"finish() called on already completed evaluation {self.evaluation_id} "
118-
f"(version: {self.version_id}, validator: {self.validator_hotkey}). "
119-
f"Skipping to prevent agent status overwrites."
120-
)
121-
return None
122-
123-
# Use the evaluation lock to prevent race conditions with disconnection handling
124-
# async with Evaluation.get_lock():
125-
# DEBUG: Check if this evaluation already has a terminated_reason set
109+
# DEBUG: Check if this evaluation already has a terminated_reason set
126110
current_terminated_reason = await conn.fetchval(
127111
"SELECT terminated_reason FROM evaluations WHERE evaluation_id = $1",
128112
self.evaluation_id
129113
)
130114
if current_terminated_reason:
131-
# current_status = await conn.fetchval(
132-
# "SELECT status FROM evaluations WHERE evaluation_id = $1",
133-
# self.evaluation_id
134-
# )
115+
current_status = await conn.fetchval(
116+
"SELECT status FROM evaluations WHERE evaluation_id = $1",
117+
self.evaluation_id
118+
)
135119

136120
# Print very noticeable debug information
137121
print("=" * 80)
@@ -500,25 +484,19 @@ async def create_screening_and_send(conn: asyncpg.Connection, agent: 'MinerAgent
500484
"""Create screening evaluation"""
501485
from api.src.socket.websocket_manager import WebSocketManager
502486

503-
# # Additional safety check: Ensure this agent doesn't already have a running screening at the same stage (lowk useless)
504-
# screener_stage = screener.stage
505-
# agent_running_screening = await conn.fetchval(
506-
# """
507-
# SELECT COUNT(*) FROM evaluations e
508-
# JOIN miner_agents ma ON e.version_id = ma.version_id
509-
# WHERE ma.version_id = $1
510-
# AND (
511-
# (e.validator_hotkey LIKE 'screener-1-%' OR e.validator_hotkey LIKE 'i-0%')
512-
# OR e.validator_hotkey LIKE 'screener-2-%'
513-
# )
514-
# AND e.status = 'running'
515-
# """,
516-
# agent.version_id
517-
# )
487+
# Safety check: Ensure screener doesn't already have a running evaluation
488+
existing_evaluation = await conn.fetchrow(
489+
"""
490+
SELECT evaluation_id, status FROM evaluations
491+
WHERE validator_hotkey = $1 AND status = 'running'
492+
LIMIT 1
493+
""",
494+
screener.hotkey
495+
)
518496

519-
# if agent_running_screening > 0:
520-
# logger.error(f"CRITICAL: Agent {agent.version_id} already has running screening - refusing to create duplicate screening")
521-
# return "", False
497+
if existing_evaluation:
498+
logger.error(f"CRITICAL: Screener {screener.hotkey} already has running evaluation {existing_evaluation['evaluation_id']} - refusing to create duplicate screening")
499+
return "", False
522500

523501
ws = WebSocketManager.get_instance()
524502

@@ -597,7 +575,7 @@ async def screen_next_awaiting_agent(screener: "Screener"):
597575
# Log the agents for debugging
598576
awaiting_agents = await conn.fetch(
599577
"""
600-
SELECT version_id, miner_hotkey, agent_name, created_at, version_num, created_at FROM miner_agents
578+
SELECT version_id, miner_hotkey, agent_name, created_at FROM miner_agents
601579
WHERE status = $1
602580
AND miner_hotkey NOT IN (SELECT miner_hotkey from banned_hotkeys)
603581
ORDER BY created_at ASC
@@ -607,20 +585,45 @@ async def screen_next_awaiting_agent(screener: "Screener"):
607585
for agent in awaiting_agents[:3]: # Log first 3
608586
logger.info(f"Awaiting stage {screener.stage} agent: {agent['agent_name']} ({agent['version_id']}) from {agent['miner_hotkey']}")
609587

588+
# Atomically claim the next awaiting agent for this stage using CTE with FOR UPDATE SKIP LOCKED
589+
logger.debug(f"Stage {screener.stage} screener {screener.hotkey} attempting to claim agent with status '{target_status}'")
590+
try:
591+
claimed_agent = await conn.fetchrow(
592+
"""
593+
WITH next_agent AS (
594+
SELECT version_id FROM miner_agents
595+
WHERE status = $1
596+
AND miner_hotkey NOT IN (SELECT miner_hotkey from banned_hotkeys)
597+
ORDER BY created_at ASC
598+
FOR UPDATE SKIP LOCKED
599+
LIMIT 1
600+
)
601+
UPDATE miner_agents
602+
SET status = $2
603+
FROM next_agent
604+
WHERE miner_agents.version_id = next_agent.version_id
605+
RETURNING miner_agents.version_id, miner_hotkey, agent_name, version_num, created_at
606+
""",
607+
target_status,
608+
target_screening_status
609+
)
610+
except Exception as e:
611+
logger.warning(f"Database error while claiming agent for screener {screener.hotkey}: {e}")
612+
claimed_agent = None
610613

611-
else:
614+
if not claimed_agent:
612615
screener.set_available() # Ensure available state is set
613616
logger.info(f"No stage {screener.stage} agents claimed by screener {screener.hotkey} despite {awaiting_count} awaiting")
614617
return
615618

616-
logger.info(f"Stage {screener.stage} screener {screener.hotkey} claimed agent {awaiting_agents[0]['agent_name']} ({awaiting_agents[0]['version_id']})")
619+
logger.info(f"Stage {screener.stage} screener {screener.hotkey} claimed agent {claimed_agent['agent_name']} ({claimed_agent['version_id']})")
617620

618621
agent = MinerAgent(
619-
version_id=awaiting_agents[0]["version_id"],
620-
miner_hotkey=awaiting_agents[0]["miner_hotkey"],
621-
agent_name=awaiting_agents[0]["agent_name"],
622-
version_num=awaiting_agents[0]["version_num"],
623-
created_at=awaiting_agents[0]["created_at"],
622+
version_id=claimed_agent["version_id"],
623+
miner_hotkey=claimed_agent["miner_hotkey"],
624+
agent_name=claimed_agent["agent_name"],
625+
version_num=claimed_agent["version_num"],
626+
created_at=claimed_agent["created_at"],
624627
status=target_screening_status, # Already set to correct status in query
625628
)
626629

@@ -766,7 +769,6 @@ async def handle_validator_disconnection(validator_hotkey: str):
766769
@staticmethod
767770
async def handle_screener_disconnection(screener_hotkey: str):
768771
"""Atomically handle screener disconnection: error active evaluations and reset agents"""
769-
# async with Evaluation.get_lock():
770772
async with get_transaction() as conn:
771773
# Get active screening evaluations for all screener types
772774
active_screenings = await conn.fetch(

0 commit comments

Comments
 (0)