Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion api/src/backend/queries/evaluations.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,4 +439,14 @@ async def get_miner_hotkey_from_version_id(conn: asyncpg.Connection, version_id:
SELECT miner_hotkey
FROM miner_agents
WHERE version_id = $1
""", version_id)
""", version_id)

@db_operation
async def is_screener_running_evaluation(conn: asyncpg.Connection, validator_hotkey: str) -> bool:
"""Check if a screener is running an evaluation"""
return await conn.fetchval(
"""
SELECT EXISTS(SELECT 1 FROM evaluations WHERE validator_hotkey = $1 AND status = 'running')
""",
validator_hotkey
)
3 changes: 2 additions & 1 deletion api/src/backend/queries/evaluations.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ async def get_running_evaluation_by_miner_hotkey(miner_hotkey: str) -> Optional[
async def does_validator_have_running_evaluation(validator_hotkey: str) -> bool: ...
async def get_queue_info(validator_hotkey: str, length: int = 10) -> List[Evaluation]: ...
async def get_agent_name_from_version_id(version_id: str) -> Optional[str]: ...
async def get_miner_hotkey_from_version_id(version_id: str) -> Optional[str]: ...
async def get_miner_hotkey_from_version_id(version_id: str) -> Optional[str]: ...
async def is_screener_running_evaluation(validator_hotkey: str) -> bool: ...
11 changes: 10 additions & 1 deletion api/src/endpoints/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,16 @@ async def post_agent(
detail="No stage 1 screeners available for agent evaluation. Please try again later."
)

# Check is there is an evaluation in the database with the screener's hotkey that has running status
# This is to prevent the case where a screener.is_available() returns true but the screener is actually running an evaluation
# from api.src.backend.queries.evaluations import is_screener_running_evaluation
# is_screener_running_evaluation = await is_screener_running_evaluation(screener.hotkey)
# if is_screener_running_evaluation:
# logger.error(f"No available stage 1 screener for agent upload from miner {miner_hotkey} - screener {screener.hotkey} said it was available but there is an evaluation in the database with the screener's hotkey that has running status")
# raise HTTPException(
# status_code=409,
# detail="No stage 1 screeners available for agent evaluation. Please try again later."
# )

async with get_transaction() as conn:
can_upload = await Evaluation.check_miner_has_no_running_evaluations(conn, miner_hotkey)
Expand Down Expand Up @@ -149,7 +159,6 @@ async def post_agent(
logger.warning(f"Failed to assign agent {agent.version_id} to screener")
else:
logger.warning(f"Failed to assign agent {agent.version_id} to screener - screener is not running")


# Screener state is now committed, lock can be released

Expand Down
98 changes: 50 additions & 48 deletions api/src/models/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,32 +106,16 @@ async def start(self, conn: asyncpg.Connection) -> List[EvaluationRun]:
async def finish(self, conn: asyncpg.Connection):
"""Finish evaluation, but retry if >=50% of inferences failed and any run errored"""

# Check if evaluation is already completed - prevent duplicate finish calls
current_status = await conn.fetchval(
"SELECT status FROM evaluations WHERE evaluation_id = $1",
self.evaluation_id
)

if current_status == 'completed':
logger.warning(
f"finish() called on already completed evaluation {self.evaluation_id} "
f"(version: {self.version_id}, validator: {self.validator_hotkey}). "
f"Skipping to prevent agent status overwrites."
)
return None

# Use the evaluation lock to prevent race conditions with disconnection handling
# async with Evaluation.get_lock():
# DEBUG: Check if this evaluation already has a terminated_reason set
# DEBUG: Check if this evaluation already has a terminated_reason set
current_terminated_reason = await conn.fetchval(
"SELECT terminated_reason FROM evaluations WHERE evaluation_id = $1",
self.evaluation_id
)
if current_terminated_reason:
# current_status = await conn.fetchval(
# "SELECT status FROM evaluations WHERE evaluation_id = $1",
# self.evaluation_id
# )
current_status = await conn.fetchval(
"SELECT status FROM evaluations WHERE evaluation_id = $1",
self.evaluation_id
)

# Print very noticeable debug information
print("=" * 80)
Expand Down Expand Up @@ -500,25 +484,19 @@ async def create_screening_and_send(conn: asyncpg.Connection, agent: 'MinerAgent
"""Create screening evaluation"""
from api.src.socket.websocket_manager import WebSocketManager

# # Additional safety check: Ensure this agent doesn't already have a running screening at the same stage (lowk useless)
# screener_stage = screener.stage
# agent_running_screening = await conn.fetchval(
# """
# SELECT COUNT(*) FROM evaluations e
# JOIN miner_agents ma ON e.version_id = ma.version_id
# WHERE ma.version_id = $1
# AND (
# (e.validator_hotkey LIKE 'screener-1-%' OR e.validator_hotkey LIKE 'i-0%')
# OR e.validator_hotkey LIKE 'screener-2-%'
# )
# AND e.status = 'running'
# """,
# agent.version_id
# )
# Safety check: Ensure screener doesn't already have a running evaluation
existing_evaluation = await conn.fetchrow(
"""
SELECT evaluation_id, status FROM evaluations
WHERE validator_hotkey = $1 AND status = 'running'
LIMIT 1
""",
screener.hotkey
)

# if agent_running_screening > 0:
# logger.error(f"CRITICAL: Agent {agent.version_id} already has running screening - refusing to create duplicate screening")
# return "", False
if existing_evaluation:
logger.error(f"CRITICAL: Screener {screener.hotkey} already has running evaluation {existing_evaluation['evaluation_id']} - refusing to create duplicate screening")
return "", False

ws = WebSocketManager.get_instance()

Expand Down Expand Up @@ -597,7 +575,7 @@ async def screen_next_awaiting_agent(screener: "Screener"):
# Log the agents for debugging
awaiting_agents = await conn.fetch(
"""
SELECT version_id, miner_hotkey, agent_name, created_at, version_num, created_at FROM miner_agents
SELECT version_id, miner_hotkey, agent_name, created_at FROM miner_agents
WHERE status = $1
AND miner_hotkey NOT IN (SELECT miner_hotkey from banned_hotkeys)
ORDER BY created_at ASC
Expand All @@ -607,20 +585,45 @@ async def screen_next_awaiting_agent(screener: "Screener"):
for agent in awaiting_agents[:3]: # Log first 3
logger.info(f"Awaiting stage {screener.stage} agent: {agent['agent_name']} ({agent['version_id']}) from {agent['miner_hotkey']}")

# Atomically claim the next awaiting agent for this stage using CTE with FOR UPDATE SKIP LOCKED
logger.debug(f"Stage {screener.stage} screener {screener.hotkey} attempting to claim agent with status '{target_status}'")
try:
claimed_agent = await conn.fetchrow(
"""
WITH next_agent AS (
SELECT version_id FROM miner_agents
WHERE status = $1
AND miner_hotkey NOT IN (SELECT miner_hotkey from banned_hotkeys)
ORDER BY created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE miner_agents
SET status = $2
FROM next_agent
WHERE miner_agents.version_id = next_agent.version_id
RETURNING miner_agents.version_id, miner_hotkey, agent_name, version_num, created_at
""",
target_status,
target_screening_status
)
except Exception as e:
logger.warning(f"Database error while claiming agent for screener {screener.hotkey}: {e}")
claimed_agent = None

else:
if not claimed_agent:
screener.set_available() # Ensure available state is set
logger.info(f"No stage {screener.stage} agents claimed by screener {screener.hotkey} despite {awaiting_count} awaiting")
return

logger.info(f"Stage {screener.stage} screener {screener.hotkey} claimed agent {awaiting_agents[0]['agent_name']} ({awaiting_agents[0]['version_id']})")
logger.info(f"Stage {screener.stage} screener {screener.hotkey} claimed agent {claimed_agent['agent_name']} ({claimed_agent['version_id']})")

agent = MinerAgent(
version_id=awaiting_agents[0]["version_id"],
miner_hotkey=awaiting_agents[0]["miner_hotkey"],
agent_name=awaiting_agents[0]["agent_name"],
version_num=awaiting_agents[0]["version_num"],
created_at=awaiting_agents[0]["created_at"],
version_id=claimed_agent["version_id"],
miner_hotkey=claimed_agent["miner_hotkey"],
agent_name=claimed_agent["agent_name"],
version_num=claimed_agent["version_num"],
created_at=claimed_agent["created_at"],
status=target_screening_status, # Already set to correct status in query
)

Expand Down Expand Up @@ -766,7 +769,6 @@ async def handle_validator_disconnection(validator_hotkey: str):
@staticmethod
async def handle_screener_disconnection(screener_hotkey: str):
"""Atomically handle screener disconnection: error active evaluations and reset agents"""
# async with Evaluation.get_lock():
async with get_transaction() as conn:
# Get active screening evaluations for all screener types
active_screenings = await conn.fetch(
Expand Down
Loading
Loading