Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions api/src/backend/queries/evaluations.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,13 +440,3 @@ async def get_miner_hotkey_from_version_id(conn: asyncpg.Connection, version_id:
FROM miner_agents
WHERE version_id = $1
""", version_id)

@db_operation
async def is_screener_running_evaluation(conn: asyncpg.Connection, validator_hotkey: str) -> bool:
"""Check if a screener is running an evaluation"""
return await conn.fetchval(
"""
SELECT EXISTS(SELECT 1 FROM evaluations WHERE validator_hotkey = $1 AND status = 'running')
""",
validator_hotkey
)
3 changes: 1 addition & 2 deletions api/src/backend/queries/evaluations.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,4 @@ async def get_running_evaluation_by_miner_hotkey(miner_hotkey: str) -> Optional[
async def does_validator_have_running_evaluation(validator_hotkey: str) -> bool: ...
async def get_queue_info(validator_hotkey: str, length: int = 10) -> List[Evaluation]: ...
async def get_agent_name_from_version_id(version_id: str) -> Optional[str]: ...
async def get_miner_hotkey_from_version_id(version_id: str) -> Optional[str]: ...
async def is_screener_running_evaluation(validator_hotkey: str) -> bool: ...
async def get_miner_hotkey_from_version_id(version_id: str) -> Optional[str]: ...
12 changes: 0 additions & 12 deletions api/src/endpoints/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,6 @@ async def post_agent(
status_code=503,
detail="No stage 1 screeners available for agent evaluation. Please try again later."
)

# Check is there is an evaluation in the database with the screener's hotkey that has running status
# This is to prevent the case where a screener.is_available() returns true but the screener is actually running an evaluation
# from api.src.backend.queries.evaluations import is_screener_running_evaluation
# is_screener_running_evaluation = await is_screener_running_evaluation(screener.hotkey)
# if is_screener_running_evaluation:
# logger.error(f"No available stage 1 screener for agent upload from miner {miner_hotkey} - screener {screener.hotkey} said it was available but there is an evaluation in the database with the screener's hotkey that has running status")
# raise HTTPException(
# status_code=409,
# detail="No stage 1 screeners available for agent evaluation. Please try again later."
# )

async with get_transaction() as conn:
can_upload = await Evaluation.check_miner_has_no_running_evaluations(conn, miner_hotkey)
if not can_upload:
Expand Down
93 changes: 47 additions & 46 deletions api/src/models/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,20 @@ async def start(self, conn: asyncpg.Connection) -> List[EvaluationRun]:

async def finish(self, conn: asyncpg.Connection):
"""Finish evaluation, but retry if >=50% of inferences failed and any run errored"""

# Check if evaluation is already completed - prevent duplicate finish calls
current_status = await conn.fetchval(
"SELECT status FROM evaluations WHERE evaluation_id = $1",
self.evaluation_id
)

if current_status == 'completed':
logger.warning(
f"finish() called on already completed evaluation {self.evaluation_id} "
f"(version: {self.version_id}, validator: {self.validator_hotkey}). "
f"Skipping to prevent agent status overwrites."
)
return None

# DEBUG: Check if this evaluation already has a terminated_reason set
current_terminated_reason = await conn.fetchval(
Expand Down Expand Up @@ -484,19 +498,6 @@ async def create_screening_and_send(conn: asyncpg.Connection, agent: 'MinerAgent
"""Create screening evaluation"""
from api.src.socket.websocket_manager import WebSocketManager

# Safety check: Ensure screener doesn't already have a running evaluation
existing_evaluation = await conn.fetchrow(
"""
SELECT evaluation_id, status FROM evaluations
WHERE validator_hotkey = $1 AND status = 'running'
LIMIT 1
""",
screener.hotkey
)

if existing_evaluation:
logger.error(f"CRITICAL: Screener {screener.hotkey} already has running evaluation {existing_evaluation['evaluation_id']} - refusing to create duplicate screening")
return "", False

ws = WebSocketManager.get_instance()

Expand Down Expand Up @@ -575,7 +576,7 @@ async def screen_next_awaiting_agent(screener: "Screener"):
# Log the agents for debugging
awaiting_agents = await conn.fetch(
"""
SELECT version_id, miner_hotkey, agent_name, created_at FROM miner_agents
SELECT version_id, miner_hotkey, agent_name, created_at, version_num FROM miner_agents
WHERE status = $1
AND miner_hotkey NOT IN (SELECT miner_hotkey from banned_hotkeys)
ORDER BY created_at ASC
Expand All @@ -585,45 +586,45 @@ async def screen_next_awaiting_agent(screener: "Screener"):
for agent in awaiting_agents[:3]: # Log first 3
logger.info(f"Awaiting stage {screener.stage} agent: {agent['agent_name']} ({agent['version_id']}) from {agent['miner_hotkey']}")

# Atomically claim the next awaiting agent for this stage using CTE with FOR UPDATE SKIP LOCKED
logger.debug(f"Stage {screener.stage} screener {screener.hotkey} attempting to claim agent with status '{target_status}'")
try:
claimed_agent = await conn.fetchrow(
"""
WITH next_agent AS (
SELECT version_id FROM miner_agents
WHERE status = $1
AND miner_hotkey NOT IN (SELECT miner_hotkey from banned_hotkeys)
ORDER BY created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE miner_agents
SET status = $2
FROM next_agent
WHERE miner_agents.version_id = next_agent.version_id
RETURNING miner_agents.version_id, miner_hotkey, agent_name, version_num, created_at
""",
target_status,
target_screening_status
)
except Exception as e:
logger.warning(f"Database error while claiming agent for screener {screener.hotkey}: {e}")
claimed_agent = None
# # Atomically claim the next awaiting agent for this stage using CTE with FOR UPDATE SKIP LOCKED
# logger.debug(f"Stage {screener.stage} screener {screener.hotkey} attempting to claim agent with status '{target_status}'")
# try:
# claimed_agent = await conn.fetchrow(
# """
# WITH next_agent AS (
# SELECT version_id FROM miner_agents
# WHERE status = $1
# AND miner_hotkey NOT IN (SELECT miner_hotkey from banned_hotkeys)
# ORDER BY created_at ASC
# FOR UPDATE SKIP LOCKED
# LIMIT 1
# )
# UPDATE miner_agents
# SET status = $2
# FROM next_agent
# WHERE miner_agents.version_id = next_agent.version_id
# RETURNING miner_agents.version_id, miner_hotkey, agent_name, version_num, created_at
# """,
# target_status,
# target_screening_status
# )
# except Exception as e:
# logger.warning(f"Database error while claiming agent for screener {screener.hotkey}: {e}")
# claimed_agent = None

if not claimed_agent:
else:
screener.set_available() # Ensure available state is set
logger.info(f"No stage {screener.stage} agents claimed by screener {screener.hotkey} despite {awaiting_count} awaiting")
return

logger.info(f"Stage {screener.stage} screener {screener.hotkey} claimed agent {claimed_agent['agent_name']} ({claimed_agent['version_id']})")
logger.info(f"Stage {screener.stage} screener {screener.hotkey} claimed agent {awaiting_agents[0]['agent_name']} ({awaiting_agents[0]['version_id']})")

agent = MinerAgent(
version_id=claimed_agent["version_id"],
miner_hotkey=claimed_agent["miner_hotkey"],
agent_name=claimed_agent["agent_name"],
version_num=claimed_agent["version_num"],
created_at=claimed_agent["created_at"],
version_id=awaiting_agents[0]["version_id"],
miner_hotkey=awaiting_agents[0]["miner_hotkey"],
agent_name=awaiting_agents[0]["agent_name"],
version_num=awaiting_agents[0]["version_num"],
created_at=awaiting_agents[0]["created_at"],
status=target_screening_status, # Already set to correct status in query
)

Expand Down
14 changes: 14 additions & 0 deletions api/src/models/screener.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,13 +318,27 @@ async def get_first_available() -> Optional['Screener']:
async def get_first_available_and_reserve(stage: int) -> Optional['Screener']:
"""Atomically find and reserve first available screener for specific stage - MUST be called within Evaluation lock"""
from api.src.socket.websocket_manager import WebSocketManager
from api.src.backend.db_manager import get_db_connection
ws_manager = WebSocketManager.get_instance()

for client in ws_manager.clients.values():
if (client.get_type() == "screener" and
client.status == "available" and
client.is_available() and
client.stage == stage):

# Double-check against database to ensure screener is truly available
async with get_db_connection() as db_conn:
has_running_evaluation = await db_conn.fetchval(
"""
SELECT EXISTS(SELECT 1 FROM evaluations
WHERE validator_hotkey = $1 AND status = 'running')
""", client.hotkey
)

if has_running_evaluation:
logger.warning(f"Screener {client.hotkey} appears available in memory but has running evaluation in database - skipping")
continue

# Immediately reserve to prevent race conditions
client.status = "reserving"
Expand Down
Loading