diff --git a/api/src/endpoints/scoring.py b/api/src/endpoints/scoring.py index 26218b08..6bd0f31c 100644 --- a/api/src/endpoints/scoring.py +++ b/api/src/endpoints/scoring.py @@ -17,7 +17,7 @@ from api.src.backend.queries.agents import get_top_agent, ban_agents as db_ban_agents, approve_agent_version, get_agent_by_version_id as db_get_agent_by_version_id from api.src.backend.entities import MinerAgentScored from api.src.backend.db_manager import get_transaction, new_db, get_db_connection -from api.src.utils.refresh_subnet_hotkeys import check_if_hotkey_is_registered +from api.src.utils.subtensor import get_subnet_hotkeys, check_if_hotkey_is_registered from api.src.utils.slack import notify_unregistered_top_miner, notify_unregistered_treasury_hotkey from api.src.backend.internal_tools import InternalTools from api.src.backend.entities import TreasuryTransaction @@ -71,7 +71,7 @@ async def get_treasury_hotkey(): raise ValueError("No active treasury wallets found in database") treasury_hotkey = treasury_hotkey_data[0]["hotkey"] - if not check_if_hotkey_is_registered(treasury_hotkey): + if not await check_if_hotkey_is_registered(treasury_hotkey): logger.error(f"Treasury hotkey {treasury_hotkey} not registered on subnet") await notify_unregistered_treasury_hotkey(treasury_hotkey) @@ -100,7 +100,7 @@ async def weights() -> Dict[str, float]: if top_agent.miner_hotkey.startswith("open-"): weights[treasury_hotkey] = weight_left else: - if check_if_hotkey_is_registered(top_agent.miner_hotkey): + if await check_if_hotkey_is_registered(top_agent.miner_hotkey): weights[top_agent.miner_hotkey] = weight_left else: logger.error(f"Top agent {top_agent.miner_hotkey} not registered on subnet") diff --git a/api/src/main.py b/api/src/main.py index 86a3cab0..2bfec40f 100644 --- a/api/src/main.py +++ b/api/src/main.py @@ -24,6 +24,7 @@ from api.src.endpoints.benchmarks import router as benchmarks_router from api.src.utils.slack import send_slack_message from api.src.utils.config import WHITELISTED_VALIDATOR_IPS +from api.src.utils.hotkey_subscription import start_hotkey_subscription, stop_hotkey_subscription logger = get_logger(__name__) @@ -56,8 +57,13 @@ async def lifespan(app: FastAPI): # Start background tasks asyncio.create_task(run_weight_setting_loop(30)) + # Start hotkey subscription service + asyncio.create_task(start_hotkey_subscription()) + yield + # Stop hotkey subscription service + await stop_hotkey_subscription() await new_db.close() app = FastAPI(lifespan=lifespan) diff --git a/api/src/utils/hotkey_subscription.py b/api/src/utils/hotkey_subscription.py new file mode 100644 index 00000000..304a949e --- /dev/null +++ b/api/src/utils/hotkey_subscription.py @@ -0,0 +1,105 @@ +import json +import os +import time +import threading +import asyncio +from typing import Optional, List, Any +from substrateinterface import SubstrateInterface +from pathlib import Path +from loggers.logging_utils import get_logger + +logger = get_logger(__name__) + +NETUID = os.getenv("NETUID", "62") +SUBTENSOR_URL = os.getenv("SUBTENSOR_ADDRESS", "ws://127.0.0.1:9945") +CACHE_FILE = Path("subnet_hotkeys_cache.json") + +_subscription_thread: Optional[threading.Thread] = None +_polling_task: Optional[asyncio.Task] = None + +def _update_cache() -> None: + try: + substrate = SubstrateInterface(url=SUBTENSOR_URL, ss58_format=42, type_registry_preset="substrate-node-template") + result = substrate.query_map("SubtensorModule", "Uids", [NETUID]) + + hotkeys = [] + for uid_data in result: + try: + hotkey = uid_data[0] + if hasattr(hotkey, 'value'): + hotkey = hotkey.value + if isinstance(hotkey, bytes): + hotkey = substrate.ss58_encode(hotkey) + hotkeys.append(hotkey) + except: + continue + + temp_file = CACHE_FILE.with_suffix('.tmp') + with open(temp_file, 'w') as f: + json.dump({"hotkeys": hotkeys, "timestamp": time.time(), "count": len(hotkeys)}, f) + temp_file.replace(CACHE_FILE) + + logger.info(f"Updated cache with {len(hotkeys)} hotkeys") + substrate.close() + except Exception as e: + logger.error(f"Failed to update cache: {e}") + +def _run_subscription_thread() -> None: + while _subscription_thread and _subscription_thread.is_alive(): + try: + substrate = SubstrateInterface(url=SUBTENSOR_URL, ss58_format=42, type_registry_preset="substrate-node-template") + # Use query with subscription_handler to monitor SubnetworkN + # This storage item tracks the number of UIDs and changes when miners register/deregister + # https://docs.learnbittensor.org/subtensor-nodes/subtensor-storage-query-examples?#114-subnetworkn + substrate.query( + module="SubtensorModule", + storage_function="SubnetworkN", + params=[NETUID], + subscription_handler=lambda obj, update_nr, _: ( + logger.info(f"SubnetworkN changed (update #{update_nr}), refreshing hotkey cache"), + _update_cache() + )[1] + ) + except Exception as e: + # Filter out expected connection errors like: + # "Expecting value: line 1 column 1 (char 0)", "Connection closed", "WebSocket connection is closed" + if not any(x in str(e).lower() for x in ["expecting value", "json", "connection", "closed"]): + logger.error(f"Subscription error: {e}") + time.sleep(5) + finally: + try: + substrate.close() + except: + pass + +async def _polling_loop() -> None: + """Poll for hotkey updates every 15 minutes as a backup to subscription.""" + while _polling_task and not _polling_task.cancelled(): + try: + await asyncio.sleep(15 * 60) # 15 minutes + if _polling_task and not _polling_task.cancelled(): + logger.info("Periodic hotkey cache refresh (15min polling)") + _update_cache() + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Polling error: {e}") + +async def start_hotkey_subscription() -> None: + global _subscription_thread, _polling_task + if _subscription_thread and _subscription_thread.is_alive(): + return + logger.info("Starting hotkey subscription service with 15min polling backup") + _update_cache() + _subscription_thread = threading.Thread(target=_run_subscription_thread, daemon=True) + _subscription_thread.start() + _polling_task = asyncio.create_task(_polling_loop()) + +async def stop_hotkey_subscription() -> None: + global _subscription_thread, _polling_task + if _subscription_thread: + logger.info("Stopped hotkey subscription service") + _subscription_thread = None + if _polling_task: + _polling_task.cancel() + _polling_task = None diff --git a/api/src/utils/subtensor.py b/api/src/utils/subtensor.py index a8c5c696..fd1a5e31 100644 --- a/api/src/utils/subtensor.py +++ b/api/src/utils/subtensor.py @@ -22,8 +22,17 @@ async def get_subnet_hotkeys(): data = json.loads(content) return data["hotkeys"] else: - logger.warning("Hotkeys cache file does not exist. Make sure refresh_subnet_hotkeys.py service is running.") + logger.warning("Hotkeys cache file does not exist. Make sure hotkey subscription service is running.") return [] except Exception as e: logger.error(f"Error reading hotkeys cache: {e}") return [] + +async def check_if_hotkey_is_registered(hotkey: str) -> bool: + """Check if a hotkey is registered on the subnet by looking in the cache.""" + try: + hotkeys = await get_subnet_hotkeys() + return hotkey in hotkeys + except Exception as e: + logger.error(f"Error checking if hotkey is registered: {e}") + return False