Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions api/src/endpoints/scoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from api.src.backend.queries.agents import get_top_agent, ban_agents as db_ban_agents, approve_agent_version, get_agent_by_version_id as db_get_agent_by_version_id
from api.src.backend.entities import MinerAgentScored
from api.src.backend.db_manager import get_transaction, new_db, get_db_connection
from api.src.utils.refresh_subnet_hotkeys import check_if_hotkey_is_registered
from api.src.utils.subtensor import get_subnet_hotkeys, check_if_hotkey_is_registered
from api.src.utils.slack import notify_unregistered_top_miner, notify_unregistered_treasury_hotkey
from api.src.backend.internal_tools import InternalTools
from api.src.backend.entities import TreasuryTransaction
Expand Down Expand Up @@ -71,7 +71,7 @@ async def get_treasury_hotkey():
raise ValueError("No active treasury wallets found in database")
treasury_hotkey = treasury_hotkey_data[0]["hotkey"]

if not check_if_hotkey_is_registered(treasury_hotkey):
if not await check_if_hotkey_is_registered(treasury_hotkey):
logger.error(f"Treasury hotkey {treasury_hotkey} not registered on subnet")
await notify_unregistered_treasury_hotkey(treasury_hotkey)

Expand Down Expand Up @@ -100,7 +100,7 @@ async def weights() -> Dict[str, float]:
if top_agent.miner_hotkey.startswith("open-"):
weights[treasury_hotkey] = weight_left
else:
if check_if_hotkey_is_registered(top_agent.miner_hotkey):
if await check_if_hotkey_is_registered(top_agent.miner_hotkey):
weights[top_agent.miner_hotkey] = weight_left
else:
logger.error(f"Top agent {top_agent.miner_hotkey} not registered on subnet")
Expand Down
6 changes: 6 additions & 0 deletions api/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from api.src.endpoints.benchmarks import router as benchmarks_router
from api.src.utils.slack import send_slack_message
from api.src.utils.config import WHITELISTED_VALIDATOR_IPS
from api.src.utils.hotkey_subscription import start_hotkey_subscription, stop_hotkey_subscription

logger = get_logger(__name__)

Expand Down Expand Up @@ -56,8 +57,13 @@ async def lifespan(app: FastAPI):
# Start background tasks
asyncio.create_task(run_weight_setting_loop(30))

# Start hotkey subscription service
asyncio.create_task(start_hotkey_subscription())

yield

# Stop hotkey subscription service
await stop_hotkey_subscription()
await new_db.close()

app = FastAPI(lifespan=lifespan)
Expand Down
105 changes: 105 additions & 0 deletions api/src/utils/hotkey_subscription.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import json
import os
import time
import threading
import asyncio
from typing import Optional, List, Any
from substrateinterface import SubstrateInterface
from pathlib import Path
from loggers.logging_utils import get_logger

logger = get_logger(__name__)

NETUID = os.getenv("NETUID", "62")
SUBTENSOR_URL = os.getenv("SUBTENSOR_ADDRESS", "ws://127.0.0.1:9945")
CACHE_FILE = Path("subnet_hotkeys_cache.json")

_subscription_thread: Optional[threading.Thread] = None
_polling_task: Optional[asyncio.Task] = None

def _update_cache() -> None:
try:
substrate = SubstrateInterface(url=SUBTENSOR_URL, ss58_format=42, type_registry_preset="substrate-node-template")
result = substrate.query_map("SubtensorModule", "Uids", [NETUID])

hotkeys = []
for uid_data in result:
try:
hotkey = uid_data[0]
if hasattr(hotkey, 'value'):
hotkey = hotkey.value
if isinstance(hotkey, bytes):
hotkey = substrate.ss58_encode(hotkey)
hotkeys.append(hotkey)
except:
continue

temp_file = CACHE_FILE.with_suffix('.tmp')
with open(temp_file, 'w') as f:
json.dump({"hotkeys": hotkeys, "timestamp": time.time(), "count": len(hotkeys)}, f)
temp_file.replace(CACHE_FILE)

logger.info(f"Updated cache with {len(hotkeys)} hotkeys")
substrate.close()
except Exception as e:
logger.error(f"Failed to update cache: {e}")

def _run_subscription_thread() -> None:
while _subscription_thread and _subscription_thread.is_alive():
try:
substrate = SubstrateInterface(url=SUBTENSOR_URL, ss58_format=42, type_registry_preset="substrate-node-template")
# Use query with subscription_handler to monitor SubnetworkN
# This storage item tracks the number of UIDs and changes when miners register/deregister
# https://docs.learnbittensor.org/subtensor-nodes/subtensor-storage-query-examples?#114-subnetworkn
substrate.query(
module="SubtensorModule",
storage_function="SubnetworkN",
params=[NETUID],
subscription_handler=lambda obj, update_nr, _: (
logger.info(f"SubnetworkN changed (update #{update_nr}), refreshing hotkey cache"),
_update_cache()
)[1]
)
except Exception as e:
# Filter out expected connection errors like:
# "Expecting value: line 1 column 1 (char 0)", "Connection closed", "WebSocket connection is closed"
if not any(x in str(e).lower() for x in ["expecting value", "json", "connection", "closed"]):
logger.error(f"Subscription error: {e}")
time.sleep(5)
finally:
try:
substrate.close()
except:
pass

async def _polling_loop() -> None:
"""Poll for hotkey updates every 15 minutes as a backup to subscription."""
while _polling_task and not _polling_task.cancelled():
try:
await asyncio.sleep(15 * 60) # 15 minutes
if _polling_task and not _polling_task.cancelled():
logger.info("Periodic hotkey cache refresh (15min polling)")
_update_cache()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Polling error: {e}")

async def start_hotkey_subscription() -> None:
global _subscription_thread, _polling_task
if _subscription_thread and _subscription_thread.is_alive():
return
logger.info("Starting hotkey subscription service with 15min polling backup")
_update_cache()
_subscription_thread = threading.Thread(target=_run_subscription_thread, daemon=True)
_subscription_thread.start()
_polling_task = asyncio.create_task(_polling_loop())

async def stop_hotkey_subscription() -> None:
global _subscription_thread, _polling_task
if _subscription_thread:
logger.info("Stopped hotkey subscription service")
_subscription_thread = None
if _polling_task:
_polling_task.cancel()
_polling_task = None
11 changes: 10 additions & 1 deletion api/src/utils/subtensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,17 @@ async def get_subnet_hotkeys():
data = json.loads(content)
return data["hotkeys"]
else:
logger.warning("Hotkeys cache file does not exist. Make sure refresh_subnet_hotkeys.py service is running.")
logger.warning("Hotkeys cache file does not exist. Make sure hotkey subscription service is running.")
return []
except Exception as e:
logger.error(f"Error reading hotkeys cache: {e}")
return []

async def check_if_hotkey_is_registered(hotkey: str) -> bool:
"""Check if a hotkey is registered on the subnet by looking in the cache."""
try:
hotkeys = await get_subnet_hotkeys()
return hotkey in hotkeys
except Exception as e:
logger.error(f"Error checking if hotkey is registered: {e}")
return False
Loading