diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 3c42c396..68d47943 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -35,5 +35,5 @@ jobs: run: | black --line-length 79 --exclude '(env|venv|.eggs|.git)' --check . - # - name: Run unit tests - # run: python -m unittest discover tests + - name: Run unit tests + run: pytest ./tests/ diff --git a/alembic/versions/11691bf7d981_add_validator_requests_table.py b/alembic/versions/11691bf7d981_add_validator_requests_table.py new file mode 100644 index 00000000..ce13731f --- /dev/null +++ b/alembic/versions/11691bf7d981_add_validator_requests_table.py @@ -0,0 +1,37 @@ +"""add validator_requests table + +Revision ID: 11691bf7d981 +Revises: 09dc2532fe57 +Create Date: 2024-12-31 17:10:42.389003 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "11691bf7d981" +down_revision: Union[str, None] = "09dc2532fe57" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "validator_requests", + sa.Column("id", sa.BigInteger, primary_key=True), + sa.Column("start_time", sa.DateTime(timezone=True), nullable=False), + sa.Column("asset", sa.String, nullable=True), + sa.Column("time_increment", sa.Integer, nullable=True), + sa.Column("time_length", sa.Integer, nullable=True), + sa.Column("num_simulations", sa.Integer, nullable=True), + ) + op.create_index("ix_start_time", "validator_requests", ["start_time"]) + + +def downgrade() -> None: + op.drop_index("ix_start_time", table_name="validator_requests") + op.drop_table("validator_requests") diff --git a/alembic/versions/64c3f718c191_create_metagraph_hisotry_table.py b/alembic/versions/64c3f718c191_create_metagraph_hisotry_table.py new file mode 100644 index 00000000..04873d0c --- /dev/null +++ b/alembic/versions/64c3f718c191_create_metagraph_hisotry_table.py @@ -0,0 +1,51 @@ +"""create metagraph_hisotry table + +Revision ID: 64c3f718c191 +Revises: f45a0fe27a22 +Create Date: 2025-01-07 14:54:29.448230 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "64c3f718c191" +down_revision: Union[str, None] = "f45a0fe27a22" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "metagraph_history", + sa.Column("id", sa.BigInteger, primary_key=True, nullable=False), + sa.Column("neuron_uid", sa.Integer, nullable=False), + sa.Column("incentive", sa.Float, nullable=True), + sa.Column("rank", sa.Float, nullable=True), + sa.Column("stake", sa.Float, nullable=True), + sa.Column("trust", sa.Float, nullable=True), + sa.Column("emission", sa.Float, nullable=True), + sa.Column("coldkey", sa.String, nullable=True), + sa.Column("hotkey", sa.String, nullable=True), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + ) + op.create_index( + "ix_metagraph_history_updated_at", "metagraph_history", ["updated_at"] + ) + op.create_index( + "ix_metagraph_history_neuron_uid", "metagraph_history", ["neuron_uid"] + ) + + +def downgrade() -> None: + op.drop_index( + "ix_metagraph_history_neuron_uid", table_name="metagraph_history" + ) + op.drop_index( + "ix_metagraph_history_updated_at", table_name="metagraph_history" + ) + op.drop_table("metagraph_history") diff --git a/alembic/versions/6778da854170_add_scored_time_to_miner_scores_table.py b/alembic/versions/6778da854170_add_scored_time_to_miner_scores_table.py new file mode 100644 index 00000000..ae4314e1 --- /dev/null +++ b/alembic/versions/6778da854170_add_scored_time_to_miner_scores_table.py @@ -0,0 +1,34 @@ +"""add scored_time to miner_scores table + +Revision ID: 6778da854170 +Revises: a9177927599a +Create Date: 2025-01-03 19:31:06.787524 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "6778da854170" +down_revision: Union[str, None] = "a9177927599a" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column( + "miner_scores", + sa.Column("scored_time", sa.DateTime(timezone=True), nullable=True), + ) + op.create_index( + "ix_miner_scores_scored_time", "miner_scores", ["scored_time"] + ) + + +def downgrade() -> None: + op.drop_index("ix_miner_scores_scored_time", table_name="miner_scores") + op.drop_column("miner_scores", "scored_time") diff --git a/alembic/versions/7eeae448469e_create_miner_rewards_table.py b/alembic/versions/7eeae448469e_create_miner_rewards_table.py new file mode 100644 index 00000000..cf5685fd --- /dev/null +++ b/alembic/versions/7eeae448469e_create_miner_rewards_table.py @@ -0,0 +1,68 @@ +"""create miner_rewards table + +Revision ID: 7eeae448469e +Revises: 64c3f718c191 +Create Date: 2025-01-08 13:01:03.520594 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import JSONB, JSON + + +# revision identifiers, used by Alembic. +revision: str = "7eeae448469e" +down_revision: Union[str, None] = "64c3f718c191" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # remove old table + op.drop_index( + "ix_miner_rewards_validation_time", table_name="miner_rewards" + ) + op.drop_index("ix_miner_rewards_miner_uid", table_name="miner_rewards") + op.drop_table("miner_rewards") + + # create new table + op.create_table( + "miner_rewards", + sa.Column("id", sa.BigInteger, primary_key=True, nullable=False), + sa.Column("miner_uid", sa.Integer, nullable=False), + sa.Column("smoothed_score", sa.Float, nullable=False), + sa.Column("reward_weight", sa.Float, nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + ) + op.create_index( + "ix_miner_rewards_updated_at", "miner_rewards", ["updated_at"] + ) + op.create_index( + "ix_miner_rewards_miner_uid", "miner_rewards", ["miner_uid"] + ) + + +def downgrade() -> None: + op.drop_index("ix_miner_rewards_miner_uid", table_name="miner_rewards") + op.drop_index("ix_miner_rewards_updated_at", table_name="miner_rewards") + op.drop_table("miner_rewards") + + op.create_table( + "miner_rewards", + sa.Column("id", sa.BigInteger, primary_key=True, nullable=False), + sa.Column("miner_uid", sa.Integer, nullable=False), + sa.Column("scored_time", sa.TIMESTAMP(timezone=True), nullable=False), + sa.Column("reward_details", JSONB, nullable=False), + sa.Column("reward", sa.Float, nullable=True), + sa.Column("real_prices", JSON, nullable=True), + sa.Column("prediction", JSON, nullable=True), + ) + op.create_index( + "ix_miner_rewards_scored_time", "miner_rewards", ["scored_time"] + ) + op.create_index( + "ix_miner_rewards_miner_uid", "miner_rewards", ["miner_uid"] + ) diff --git a/alembic/versions/9425131da02a_create_new_miner_predictions_table.py b/alembic/versions/9425131da02a_create_new_miner_predictions_table.py new file mode 100644 index 00000000..5cb48b85 --- /dev/null +++ b/alembic/versions/9425131da02a_create_new_miner_predictions_table.py @@ -0,0 +1,47 @@ +"""create new miner_predictions table + +Revision ID: 9425131da02a +Revises: 9468ab71357e +Create Date: 2024-12-31 18:12:53.722356 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import JSON + + +# revision identifiers, used by Alembic. +revision: str = "9425131da02a" +down_revision: Union[str, None] = "9468ab71357e" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "miner_predictions", + sa.Column("id", sa.BigInteger, primary_key=True), + sa.Column("validator_requests_id", sa.BigInteger, nullable=False), + sa.Column("miner_uid", sa.Integer, nullable=False), + sa.Column("prediction", JSON, nullable=False), + ) + op.create_foreign_key( + constraint_name="fk_miner_predictions_validator_requests_id", + source_table="miner_predictions", + referent_table="validator_requests", + local_cols=["validator_requests_id"], + remote_cols=["id"], + ondelete="RESTRICT", + ) + + +def downgrade() -> None: + op.drop_constraint( + "fk_miner_predictions_validator_requests_id", + "miner_predictions", + type_="foreignkey", + ) + op.drop_table("miner_predictions") diff --git a/alembic/versions/9468ab71357e_rename_tables_and_columns.py b/alembic/versions/9468ab71357e_rename_tables_and_columns.py new file mode 100644 index 00000000..b361a73e --- /dev/null +++ b/alembic/versions/9468ab71357e_rename_tables_and_columns.py @@ -0,0 +1,33 @@ +"""rename tables and columns + +Revision ID: 9468ab71357e +Revises: 11691bf7d981 +Create Date: 2024-12-31 17:56:40.448935 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "9468ab71357e" +down_revision: Union[str, None] = "11691bf7d981" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.rename_table("miner_predictions", "miner_predictions_old") + op.alter_column( + "miner_rewards", "start_time", new_column_name="scored_time" + ) + + +def downgrade() -> None: + op.alter_column( + "miner_rewards", "scored_time", new_column_name="start_time" + ) + op.rename_table("miner_predictions_old", "miner_predictions") diff --git a/alembic/versions/a1ced3e8532d_miner_scores_column_renaming.py b/alembic/versions/a1ced3e8532d_miner_scores_column_renaming.py new file mode 100644 index 00000000..e491f1f2 --- /dev/null +++ b/alembic/versions/a1ced3e8532d_miner_scores_column_renaming.py @@ -0,0 +1,33 @@ +"""miner_scores column renaming + +Revision ID: a1ced3e8532d +Revises: 7eeae448469e +Create Date: 2025-01-08 18:19:58.848962 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "a1ced3e8532d" +down_revision: Union[str, None] = "7eeae448469e" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.alter_column("miner_scores", "reward", new_column_name="prompt_score") + op.alter_column( + "miner_scores", "reward_details", new_column_name="score_details" + ) + + +def downgrade() -> None: + op.alter_column( + "miner_scores", "score_details", new_column_name="reward_details" + ) + op.alter_column("miner_scores", "prompt_score", new_column_name="reward") diff --git a/alembic/versions/a9177927599a_create_miner_scores_table.py b/alembic/versions/a9177927599a_create_miner_scores_table.py new file mode 100644 index 00000000..0461320a --- /dev/null +++ b/alembic/versions/a9177927599a_create_miner_scores_table.py @@ -0,0 +1,86 @@ +"""create miner_scores table + +Revision ID: a9177927599a +Revises: 9425131da02a +Create Date: 2024-12-31 19:31:59.207801 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import JSONB, JSON + + +# revision identifiers, used by Alembic. +revision: str = "a9177927599a" +down_revision: Union[str, None] = "9425131da02a" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "validator_scores_prompts", + sa.Column("id", sa.BigInteger, primary_key=True), + sa.Column("scored_time", sa.DateTime(timezone=True), nullable=False), + sa.Column("asset", sa.String, nullable=False), + sa.Column("time_increment", sa.Integer, nullable=False), + sa.Column("time_length", sa.Integer, nullable=False), + sa.Column("num_simulations", sa.Integer, nullable=False), + ) + + op.create_table( + "miner_scores", + sa.Column("id", sa.BigInteger, primary_key=True, nullable=False), + sa.Column("miner_uid", sa.Integer, nullable=False), + sa.Column( + "validator_scores_prompts_id", sa.BigInteger, nullable=False + ), + sa.Column("miner_predictions_id", sa.BigInteger, nullable=True), + sa.Column("reward", sa.Float, nullable=True), + sa.Column("reward_details", JSONB, nullable=False), + sa.Column("real_prices", JSON, nullable=True), + ) + op.create_foreign_key( + constraint_name="fk_miner_scores_validator_scores_prompts_id", + source_table="miner_scores", + referent_table="validator_scores_prompts", + local_cols=["validator_scores_prompts_id"], + remote_cols=["id"], + ondelete="RESTRICT", + ) + op.create_foreign_key( + constraint_name="fk_miner_scores_miner_predictions_id", + source_table="miner_scores", + referent_table="miner_predictions", + local_cols=["miner_predictions_id"], + remote_cols=["id"], + ) + op.create_index("ix_miner_scores_miner_uid", "miner_scores", ["miner_uid"]) + op.create_index( + "ix_validator_scores_prompts_scored_time", + "validator_scores_prompts", + ["scored_time"], + ) + + +def downgrade() -> None: + op.drop_index( + "ix_validator_scores_prompts_scored_time", + table_name="validator_scores_prompts", + ) + op.drop_index("ix_miner_scores_miner_uid", table_name="miner_scores") + op.drop_constraint( + "fk_miner_scores_miner_predictions_id", + "miner_scores", + type_="foreignkey", + ) + op.drop_constraint( + "fk_miner_scores_validator_scores_prompts_id", + "miner_scores", + type_="foreignkey", + ) + op.drop_table("miner_scores") + op.drop_table("validator_scores_prompts") diff --git a/alembic/versions/f45a0fe27a22_remove_column_from_miner_scores_and_.py b/alembic/versions/f45a0fe27a22_remove_column_from_miner_scores_and_.py new file mode 100644 index 00000000..050c8f74 --- /dev/null +++ b/alembic/versions/f45a0fe27a22_remove_column_from_miner_scores_and_.py @@ -0,0 +1,64 @@ +"""remove column from miner_scores and validator_scores_prompts table + +Revision ID: f45a0fe27a22 +Revises: 6778da854170 +Create Date: 2025-01-03 19:55:35.633424 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "f45a0fe27a22" +down_revision: Union[str, None] = "6778da854170" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.drop_constraint( + "fk_miner_scores_validator_scores_prompts_id", + "miner_scores", + type_="foreignkey", + ) + op.drop_column("miner_scores", "validator_scores_prompts_id") + op.drop_index( + "ix_validator_scores_prompts_scored_time", + table_name="validator_scores_prompts", + ) + op.drop_table("validator_scores_prompts") + + +def downgrade() -> None: + op.create_table( + "validator_scores_prompts", + sa.Column("id", sa.BigInteger, primary_key=True), + sa.Column("scored_time", sa.DateTime(timezone=True), nullable=False), + sa.Column("asset", sa.String, nullable=False), + sa.Column("time_increment", sa.Integer, nullable=False), + sa.Column("time_length", sa.Integer, nullable=False), + sa.Column("num_simulations", sa.Integer, nullable=False), + ) + op.create_index( + "ix_validator_scores_prompts_scored_time", + "validator_scores_prompts", + ["scored_time"], + ) + op.add_column( + "miner_scores", + sa.Column( + "validator_scores_prompts_id", sa.BigInteger, nullable=False + ), + ) + op.create_foreign_key( + constraint_name="fk_miner_scores_validator_scores_prompts_id", + source_table="miner_scores", + referent_table="validator_scores_prompts", + local_cols=["validator_scores_prompts_id"], + remote_cols=["id"], + ondelete="RESTRICT", + ) diff --git a/neurons/validator.py b/neurons/validator.py index 52c602f8..45961dd3 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -72,4 +72,4 @@ async def forward(self): with Validator() as validator: while True: bt.logging.info(f"Validator running... {time.time()}") - time.sleep(60) + time.sleep(600) diff --git a/requirements.txt b/requirements.txt index 02ef300c..078296dd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,3 +12,4 @@ alembic>=1.14.0 python-dotenv>=1.0.1 psycopg2-binary>=2.9.10 sqlalchemy>=2.0.36 +testcontainers>=4.9.0 diff --git a/simulation/api/__init__.py b/simulation/api/__init__.py index e69de29b..7ce53171 100644 --- a/simulation/api/__init__.py +++ b/simulation/api/__init__.py @@ -0,0 +1,84 @@ +# The MIT License (MIT) +# Copyright © 2021 Yuma Rao +# Copyright © 2023 Opentensor Foundation +# Copyright © 2023 Opentensor Technologies Inc +# Copyright © 2024 Philanthrope + +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the “Software”), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, +# and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of +# the Software. + +# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO +# THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +# DEALINGS IN THE SOFTWARE. + +from abc import ABC, abstractmethod +from typing import Any, List, Optional, Union + +import bittensor as bt + + +class SubnetsAPI(ABC): + def __init__(self, wallet: "bt.wallet"): + self.wallet = wallet + self.dendrite = bt.dendrite(wallet=wallet) + + async def __call__(self, *args, **kwargs): + return await self.query_api(*args, **kwargs) + + @abstractmethod + def prepare_synapse(self, *args, **kwargs) -> Any: + """ + Prepare the synapse-specific payload. + """ + ... + + @abstractmethod + def process_responses( + self, responses: List[Union["bt.Synapse", Any]] + ) -> Any: + """ + Process the responses from the network. + """ + ... + + async def query_api( + self, + axons: Union[bt.axon, List[bt.axon]], + deserialize: Optional[bool] = False, + timeout: Optional[int] = 12, + n: Optional[float] = 0.1, + uid: Optional[int] = None, + **kwargs: Optional[Any], + ) -> Any: + """ + Queries the API nodes of a subnet using the given synapse and bespoke query function. + + Args: + axons (Union[bt.axon, List[bt.axon]]): The list of axon(s) to query. + deserialize (bool, optional): Whether to deserialize the responses. Defaults to False. + timeout (int, optional): The timeout in seconds for the query. Defaults to 12. + n (float, optional): The fraction of top nodes to consider based on stake. Defaults to 0.1. + uid (int, optional): The specific UID of the API node to query. Defaults to None. + **kwargs: Keyword arguments for the prepare_synapse_fn. + + Returns: + Any: The result of the process_responses_fn. + """ + synapse = self.prepare_synapse(**kwargs) + bt.logging.debug( + f"Quering valdidator axons with synapse {synapse.name}..." + ) + responses = await self.dendrite( + axons=axons, + synapse=synapse, + deserialize=deserialize, + timeout=timeout, + ) + return self.process_responses(responses) diff --git a/simulation/api/example.py b/simulation/api/example.py index 7df0a450..c18126b4 100644 --- a/simulation/api/example.py +++ b/simulation/api/example.py @@ -1,3 +1,4 @@ +import bittensor import bittensor as bt from simulation.api.get_query_axons import get_query_api_axons @@ -7,14 +8,16 @@ from simulation.simulation_input import SimulationInput from simulation.utils.helpers import get_current_time +from simulation.utils.config import config # Example usage -async def test_prediction(): - wallet = bt.wallet() +async def test_prediction(args): + wallet_name = bittensor.wallet(name=args.name) + wallet = bt.wallet(wallet_name.name) # Fetch the axons of the available API nodes, or specify UIDs directly - metagraph = bt.subtensor("finney").metagraph(netuid=247) + metagraph = bt.subtensor("test").metagraph(netuid=247) uids = [uid.item() for uid in metagraph.uids if metagraph.trust[uid] > 0] @@ -45,4 +48,11 @@ async def test_prediction(): if __name__ == "__main__": - asyncio.run(test_prediction()) + import argparse + + parser = argparse.ArgumentParser(description="Generate a signature") + parser.add_argument("--message", help="The message to sign", type=str) + parser.add_argument("--name", help="The wallet name", type=str) + args = parser.parse_args() + + asyncio.run(test_prediction(args)) diff --git a/simulation/api/synth.py b/simulation/api/synth.py index d37e146b..ae667963 100644 --- a/simulation/api/synth.py +++ b/simulation/api/synth.py @@ -20,7 +20,7 @@ from typing import List, Union, Any, Dict import bittensor as bt -from bittensor.subnets import SubnetsAPI +from simulation.api import SubnetsAPI from simulation.protocol import Simulation from simulation.simulation_input import SimulationInput diff --git a/simulation/db/models.py b/simulation/db/models.py index 63339a75..997a4427 100644 --- a/simulation/db/models.py +++ b/simulation/db/models.py @@ -15,37 +15,92 @@ ) from sqlalchemy.dialects.postgresql import JSONB -# Load environment variables from .env file -load_dotenv() -# Database connection -DATABASE_URL = os.getenv("DB_URL") -engine = create_engine(DATABASE_URL) +def get_database_url(): + """Returns the database URL from environment variables.""" + load_dotenv() + return os.getenv("DB_URL") + + +def create_database_engine(): + """Creates and returns a new database engine.""" + database_url = get_database_url() + if not database_url: + raise ValueError("DB_URL is not set in environment variables.") + engine = create_engine(database_url) + return engine + + metadata = MetaData() +db_engine = None + + +def get_engine(): + """Lazy-load and return the global database engine.""" + global db_engine + if db_engine is None: + db_engine = create_database_engine() + return db_engine + # Define the table -miner_predictions = Table( - "miner_predictions", +validator_requests = Table( + "validator_requests", metadata, Column("id", BigInteger, primary_key=True), - Column("miner_uid", Integer, nullable=False), Column("start_time", DateTime(timezone=True), nullable=False), Column("asset", String, nullable=True), Column("time_increment", Integer, nullable=True), Column("time_length", Integer, nullable=True), Column("num_simulations", Integer, nullable=True), +) + +# Define the table +miner_predictions = Table( + "miner_predictions", + metadata, + Column("id", BigInteger, primary_key=True), + Column("validator_requests_id", BigInteger, nullable=False), + Column("miner_uid", Integer, nullable=False), Column("prediction", JSONB, nullable=False), ) +# Define the table +miner_scores = Table( + "miner_scores", + metadata, + Column("id", BigInteger, primary_key=True), + Column("miner_uid", Integer, nullable=False), + Column("scored_time", DateTime(timezone=True), nullable=False), + Column("miner_predictions_id", BigInteger, nullable=False), + Column("prompt_score", Float, nullable=False), + Column("score_details", JSONB, nullable=False), + Column("real_prices", JSON, nullable=False), +) + # Define the table miner_rewards = Table( "miner_rewards", metadata, Column("id", BigInteger, primary_key=True), Column("miner_uid", Integer, nullable=False), - Column("start_time", DateTime(timezone=True), nullable=False), - Column("reward_details", JSONB, nullable=False), - Column("reward", Float, nullable=False), - Column("real_prices", JSON, nullable=False), - Column("prediction", JSON, nullable=False), + Column("smoothed_score", Float, nullable=False), + Column("reward_weight", Float, nullable=False), + Column("updated_at", DateTime(timezone=True), nullable=False), +) + +# Define the table +metagraph_history = Table( + "metagraph_history", + metadata, + Column("id", BigInteger, primary_key=True), + Column("neuron_uid", Integer, nullable=False), + Column("incentive", Float, nullable=True), + Column("rank", Float, nullable=True), + Column("stake", Float, nullable=True), + Column("trust", Float, nullable=True), + Column("emission", Float, nullable=True), + Column("coldkey", String, nullable=True), + Column("hotkey", String, nullable=True), + Column("updated_at", DateTime(timezone=True), nullable=False), ) diff --git a/simulation/validator/forward.py b/simulation/validator/forward.py index ec197b46..0623b6a5 100644 --- a/simulation/validator/forward.py +++ b/simulation/validator/forward.py @@ -18,8 +18,10 @@ # DEALINGS IN THE SOFTWARE. import time +from datetime import datetime import bittensor as bt +import numpy as np from simulation.base.validator import BaseValidatorNeuron from simulation.protocol import Simulation @@ -27,6 +29,7 @@ from simulation.utils.helpers import get_current_time, round_time_to_minutes from simulation.utils.uids import check_uid_availability from simulation.validator.miner_data_handler import MinerDataHandler +from simulation.validator.moving_average import compute_weighted_averages from simulation.validator.price_data_provider import PriceDataProvider from simulation.validator.reward import get_rewards @@ -58,12 +61,27 @@ async def forward( start_time = round_time_to_minutes(current_time, 60, 60) miner_uids = [] + metagraph_info = [] for uid in range(len(self.metagraph.S)): uid_is_available = check_uid_availability( self.metagraph, uid, self.config.neuron.vpermit_tao_limit ) if uid_is_available: + metagraph_item = { + "neuron_uid": uid, + "incentive": float(self.metagraph.I[uid]), + "rank": float(self.metagraph.R[uid]), + "stake": float(self.metagraph.S[uid]), + "trust": float(self.metagraph.T[uid]), + "emission": float(self.metagraph.E[uid]), + "coldkey": self.metagraph.coldkeys[uid], + "hotkey": self.metagraph.hotkeys[uid], + "updated_at": start_time, + } miner_uids.append(uid) + metagraph_info.append(metagraph_item) + + miner_data_handler.update_metagraph_history(metagraph_info) # input data # give me prediction of BTC price for the next 1 day for every 5 min of time @@ -104,40 +122,81 @@ async def forward( # Log the results for monitoring purposes. # bt.logging.info(f"Received responses: {responses}") + miner_predictions = {} for i, response in enumerate(responses): if response is None or len(response) == 0: continue miner_id = miner_uids[i] - miner_data_handler.set_values(miner_id, response, simulation_input) + miner_predictions[miner_id] = response + + miner_data_handler.save_responses(miner_predictions, simulation_input) + + # scored_time is the same as start_time for a single validator step + # but the meaning is different + # start_time - is the time when validator asks miners for prediction data + # and stores it in the database + # scored_time - is the time when validator calculates rewards using the data + # from the database of previous prediction data + scored_time = start_time + + # get latest prediction request from validator + # for which we already have real prices data, + # i.e. (start_time + time_length) < scored_time + validator_request_id = miner_data_handler.get_latest_prediction_request( + scored_time, simulation_input + ) + if validator_request_id is None: + time.sleep(3600) # wait for an hour + return # Adjust the scores based on responses from miners. # response[0] - miner_uuids[0] # this is the function we need to implement for our incentives mechanism, # it returns an array of floats that determines how good a particular miner was at price predictions: - # example: [0.2, 0.8, 0.1] - you can see that the best miner was 2nd, and the worst 3rd + # example: [0.2, 0.7, 0.1] - you can see that the best miner was 2nd, and the worst 3rd rewards, rewards_detailed_info = get_rewards( miner_data_handler=miner_data_handler, + price_data_provider=price_data_provider, simulation_input=simulation_input, miner_uids=miner_uids, - validation_time=start_time, - price_data_provider=price_data_provider, + validator_request_id=validator_request_id, ) bt.logging.info(f"Scored responses: {rewards}") - miner_data_handler.set_reward_details(rewards_detailed_info, start_time) + miner_data_handler.set_reward_details( + reward_details=rewards_detailed_info, scored_time=scored_time + ) + + # apply custom moving average rewards + miner_scores_df = miner_data_handler.get_miner_scores(scored_time, 2) + moving_averages_data = compute_weighted_averages( + input_df=miner_scores_df, + half_life_days=1.0, + alpha=2.0, + validation_time_str=scored_time, + ) + bt.logging.info( + f"Scored responses moving averages: {moving_averages_data}" + ) + if moving_averages_data is None: + time.sleep(3600) + return + miner_data_handler.update_miner_rewards(moving_averages_data) # Update the scores based on the rewards. # You may want to define your own update_scores function for custom behavior. filtered_rewards, filtered_miner_uids = remove_zero_rewards( - rewards, miner_uids + moving_averages_data ) - self.update_scores(filtered_rewards, filtered_miner_uids) + self.update_scores(np.array(filtered_rewards), filtered_miner_uids) time.sleep(3600) # wait for an hour -def remove_zero_rewards(rewards, miner_uids): - mask = rewards != 0 - filtered_rewards = rewards[mask] - filtered_miners_uid = [miner_uids[i] for i in range(len(mask)) if mask[i]] - - return filtered_rewards, filtered_miners_uid +def remove_zero_rewards(moving_averages_data): + miners = [] + rewards = [] + for rewards_item in moving_averages_data: + if rewards_item["reward_weight"] != 0: + miners.append(rewards_item["miner_uid"]) + rewards.append(rewards_item["reward_weight"]) + return rewards, miners diff --git a/simulation/validator/miner_data_handler.py b/simulation/validator/miner_data_handler.py index dc89eeae..9fc9170e 100644 --- a/simulation/validator/miner_data_handler.py +++ b/simulation/validator/miner_data_handler.py @@ -1,110 +1,219 @@ -from datetime import datetime +from datetime import datetime, timedelta + import bittensor as bt +import pandas as pd from sqlalchemy import select, text -from simulation.db.models import engine, miner_predictions, miner_rewards +from simulation.db.models import ( + miner_predictions, + miner_scores, + validator_requests, + metagraph_history, + miner_rewards, + get_engine, +) from simulation.simulation_input import SimulationInput class MinerDataHandler: + def __init__(self, engine=None): + # Use the provided engine or fall back to the default engine + self.engine = engine or get_engine() - @staticmethod - def set_values(miner_uid, values, simulation_input: SimulationInput): - """Set values for the given miner_id and validation_time.""" + def save_responses( + self, miner_predictions_data: {}, simulation_input: SimulationInput + ): + """Save miner predictions and simulation input.""" - row_to_insert = { - "miner_uid": miner_uid, + validator_requests_row = { "start_time": simulation_input.start_time, "asset": simulation_input.asset, "time_increment": simulation_input.time_increment, "time_length": simulation_input.time_length, "num_simulations": simulation_input.num_simulations, - "prediction": values, } try: - with engine.connect() as connection: + with self.engine.connect() as connection: with connection.begin(): # Begin a transaction - insert_stmt = miner_predictions.insert().values( - row_to_insert + insert_stmt_validator_requests = ( + validator_requests.insert().values( + validator_requests_row + ) ) - connection.execute(insert_stmt) + result = connection.execute(insert_stmt_validator_requests) + validator_requests_id = result.inserted_primary_key[0] + + miner_prediction_records = [ + { + "validator_requests_id": validator_requests_id, + "miner_uid": miner_uid, + "prediction": prediction, + } + for miner_uid, prediction in miner_predictions_data.items() + ] + + insert_stmt_miner_predictions = ( + miner_predictions.insert().values( + miner_prediction_records + ) + ) + connection.execute(insert_stmt_miner_predictions) except Exception as e: - bt.logging.info(f"in set_values (got an exception): {e}") + connection.rollback() + bt.logging.error(f"in save_responses (got an exception): {e}") - @staticmethod - def set_reward_details(reward_details: [], start_time: str): + def set_reward_details(self, reward_details: [], scored_time: str): rows_to_insert = [ { "miner_uid": row["miner_uid"], - "start_time": start_time, - "reward_details": { + "scored_time": scored_time, + "miner_predictions_id": row["predictions"], + "score_details": { "score": row["score"], "softmax_score": row["softmax_score"], "crps_data": row["crps_data"], }, - "reward": row["softmax_score"], + "prompt_score": row["softmax_score"], "real_prices": row["real_prices"], - "prediction": row["predictions"], } for row in reward_details ] - with engine.begin() as connection: - try: - insert_stmt = miner_rewards.insert().values(rows_to_insert) - connection.execute(insert_stmt) - except Exception as e: - connection.rollback() - bt.logging.info( - f"in set_reward_details (got an exception): {e}" - ) + try: + with self.engine.connect() as connection: + with connection.begin(): # Begin a transaction + insert_stmt_miner_scores = miner_scores.insert().values( + rows_to_insert + ) + connection.execute(insert_stmt_miner_scores) + except Exception as e: + connection.rollback() + bt.logging.error(f"in set_reward_details (got an exception): {e}") - @staticmethod - def get_values(miner_uid: int, validation_time: str): + def get_miner_prediction(self, miner_uid: int, validator_request_id: int): """Retrieve the record with the longest valid interval for the given miner_id.""" try: - validation_time = datetime.fromisoformat(validation_time) - - with engine.connect() as connection: + with self.engine.connect() as connection: query = ( - select(miner_predictions.c.prediction) + select( + miner_predictions.c.id, miner_predictions.c.prediction + ) + .select_from(miner_predictions) .where( - ( - miner_predictions.c.start_time - + text("INTERVAL '1 second'") - * miner_predictions.c.time_length - ) - < validation_time, miner_predictions.c.miner_uid == miner_uid, + miner_predictions.c.validator_requests_id + == validator_request_id, ) - .order_by( + .limit(1) + ) + + result = connection.execute(query).fetchone() + + if result is None: + return None, [] + + record_id = result.id + prediction = result.prediction + + return record_id, prediction + except Exception as e: + bt.logging.error( + f"in get_miner_prediction (got an exception): {e}" + ) + return None, [] + + def get_latest_prediction_request( + self, scored_time_str: str, simulation_input: SimulationInput + ): + """Retrieve the id of the latest validator request that (start_time + time_length) < scored_time.""" + try: + scored_time = datetime.fromisoformat(scored_time_str) + + with self.engine.connect() as connection: + query = ( + select(validator_requests.c.id) + .select_from(validator_requests) + .where( ( - miner_predictions.c.start_time + validator_requests.c.start_time + text("INTERVAL '1 second'") - * miner_predictions.c.time_length - ).desc() + * validator_requests.c.time_length + ) + < scored_time, + validator_requests.c.asset == simulation_input.asset, + validator_requests.c.time_increment + == simulation_input.time_increment, + validator_requests.c.time_length + == simulation_input.time_length, + validator_requests.c.num_simulations + == simulation_input.num_simulations, ) + .order_by(validator_requests.c.start_time.desc()) .limit(1) ) result = connection.execute(query).fetchone() - bt.logging.info( - "in get_values, predictions fetched for miner_uid: " - + str(miner_uid) - ) + if result is None: + return None - if result is None: - return [] + return result.id + except Exception as e: + bt.logging.error( + f"in get_latest_prediction_request (got an exception): {e}" + ) + return None - bt.logging.info( - "in get_values, predictions length:" + str(len(result[0])) + def update_metagraph_history(self, metagraph_info: []): + try: + with self.engine.connect() as connection: + with connection.begin(): # Begin a transaction + insert_stmt = metagraph_history.insert().values( + metagraph_info + ) + connection.execute(insert_stmt) + except Exception as e: + connection.rollback() + bt.logging.error( + f"in update_metagraph_history (got an exception): {e}" ) - # fetchone return a tuple, so we need to return the first element, which is the prediction - return result[0] + def get_miner_scores(self, scored_time_str: str, cutoff_days: int): + scored_time = datetime.fromisoformat(scored_time_str) + min_scored_time = scored_time - timedelta(days=cutoff_days) + + try: + with self.engine.connect() as connection: + query = ( + select( + miner_scores.c.miner_uid, + miner_scores.c.prompt_score, + miner_scores.c.scored_time, + ) + .select_from(miner_scores) + .where(miner_scores.c.scored_time > min_scored_time) + ) + + result = connection.execute(query) + + df = pd.DataFrame(result.fetchall(), columns=result.keys()) + + return df except Exception as e: - bt.logging.info(f"in get_values (got an exception): {e}") - print(e) - return [] + bt.logging.error(f"in get_miner_scores (got an exception): {e}") + return pd.DataFrame() + + def update_miner_rewards(self, miner_rewards_data: []): + try: + with self.engine.connect() as connection: + with connection.begin(): # Begin a transaction + insert_stmt = miner_rewards.insert().values( + miner_rewards_data + ) + connection.execute(insert_stmt) + except Exception as e: + connection.rollback() + bt.logging.error( + f"in update_miner_rewards (got an exception): {e}" + ) diff --git a/simulation/validator/moving_average.py b/simulation/validator/moving_average.py new file mode 100644 index 00000000..2c0dee35 --- /dev/null +++ b/simulation/validator/moving_average.py @@ -0,0 +1,104 @@ +from datetime import datetime, timezone + +import pandas as pd +from pandas import DataFrame + + +def compute_weighted_averages( + input_df: DataFrame, + half_life_days: float, + alpha: float, + validation_time_str: str, +) -> []: + """ + Reads a TSV file of miner rewards, computes an exponentially weighted + moving average (EWMA) with a user-specified half-life, then outputs: + 1) The EWMA of each miner's reward + 2) EWMA^alpha, normalized across miners + + The file must have columns: + - 'miner_uid' + - 'reward' + - 'scored_time' + and be tab-separated. + + :param input_df: Dataframe of miner rewards. + :param half_life_days: The half-life in days for the exponential decay. + :param alpha: The exponent to raise the EWMA to, before normalization. + :param validation_time_str: The current time when validator does the scoring. + """ + if input_df.empty: + return None + + validation_time = datetime.fromisoformat(validation_time_str).replace( + tzinfo=timezone.utc + ) + + # Group by miner_uid + grouped = input_df.groupby("miner_uid") + + results = [] # will hold tuples of (miner_uid, ewma) + + for miner_uid, group_df in grouped: + total_weight = 0.0 + weighted_reward_sum = 0.0 + + for _, row in group_df.iterrows(): + if pd.isna(row["prompt_score"]): + continue # skip missing or invalid reward + + w = compute_weight( + row["scored_time"], validation_time, half_life_days + ) + total_weight += w + weighted_reward_sum += w * row["prompt_score"] + + ewma = ( + weighted_reward_sum / total_weight + if total_weight > 0 + else float("nan") + ) + results.append((miner_uid, ewma)) + + # Now compute EWMA^alpha for each miner and normalize + # If the EWMA is NaN, treat it as 0 for the power-sum. + miner_uids = [r[0] for r in results] + ewm_as = [r[1] for r in results] + + # Convert NaN to 0.0 for the exponent operation and sum + ewm_as_nonan = [0.0 if pd.isna(x) else x for x in ewm_as] + ewm_as_pow = [x**alpha for x in ewm_as_nonan] # raise to alpha (default=2) + + pow_sum = sum(ewm_as_pow) + + # Avoid division by zero if all are zero + if pow_sum <= 0: + norm_scores = [0.0] * len(ewm_as_pow) + else: + norm_scores = [x / pow_sum for x in ewm_as_pow] + + rewards = [] + for (miner_uid, ewma_val), norm_val in zip(results, norm_scores): + reward_item = { + "miner_uid": miner_uid, + "smoothed_score": float(ewma_val), + "reward_weight": float(norm_val), + "updated_at": validation_time_str, + } + rewards.append(reward_item) + + return rewards + + +def compute_weight( + scored_dt: datetime, validation_time: datetime, half_life_days: float +) -> float: + """ + For a row with timestamp scored_dt, the age in days is delta_days. + weight = 0.5^(delta_days / half_life_days), meaning that + after 'half_life_days' days, the weight decays to 0.5. + """ + delta_days = (validation_time - scored_dt).total_seconds() / ( + 24.0 * 3600.0 + ) + return 0.5 ** (delta_days / half_life_days) diff --git a/simulation/validator/reward.py b/simulation/validator/reward.py index eb47b9e0..a022bd72 100644 --- a/simulation/validator/reward.py +++ b/simulation/validator/reward.py @@ -34,7 +34,7 @@ def reward( price_data_provider: PriceDataProvider, miner_uid: int, simulation_input: SimulationInput, - validation_time: str, + validator_request_id: int, ): """ Reward the miner response to the simulation_input request. This method returns a reward @@ -44,17 +44,19 @@ def reward( - float: The reward value for the miner. """ - predictions = miner_data_handler.get_values(miner_uid, validation_time) + miner_prediction_id, predictions = miner_data_handler.get_miner_prediction( + miner_uid, validator_request_id + ) if predictions is None or len(predictions) == 0: - return -1, [], [], [] # represents no prediction data from the miner + return -1, [], [], None # represents no prediction data from the miner # get last time in predictions end_time = predictions[0][len(predictions[0]) - 1]["time"] real_prices = price_data_provider.fetch_data(end_time) if len(real_prices) == 0: - return -1, [], [], predictions + return -1, [], [], miner_prediction_id # in case some of the time points is not overlapped intersecting_predictions = [] @@ -77,7 +79,7 @@ def reward( simulation_input.time_increment, ) - return score, detailed_crps_data, real_prices, predictions + return score, detailed_crps_data, real_prices, miner_prediction_id def get_rewards( @@ -85,7 +87,7 @@ def get_rewards( price_data_provider: PriceDataProvider, simulation_input: SimulationInput, miner_uids: List[int], - validation_time: str, + validator_request_id: int, ) -> (np.ndarray, []): """ Returns an array of rewards for the given query and responses. @@ -111,7 +113,7 @@ def get_rewards( price_data_provider, miner_id, simulation_input, - validation_time, + validator_request_id, ) scores.append(score) detailed_crps_data_list.append(detailed_crps_data) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 00000000..9b286233 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,33 @@ +import os +import subprocess + +import pytest +from sqlalchemy import create_engine +from testcontainers.postgres import PostgresContainer + +postgres = PostgresContainer("postgres:16-alpine") + + +@pytest.fixture(scope="module", autouse=True) +def setup(request): + postgres.start() + + def remove_container(): + postgres.stop() + + request.addfinalizer(remove_container) + os.environ["DB_URL"] = postgres.get_connection_url() + + +@pytest.fixture(scope="module", autouse=True) +def apply_migrations(setup): + project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + alembic_command = ["alembic", "upgrade", "head"] + subprocess.run(alembic_command, check=True, cwd=project_root) + + +@pytest.fixture(scope="module", autouse=True) +def db_engine(setup): + engine = create_engine(os.environ["DB_URL"]) + yield engine + engine.dispose() diff --git a/tests/test_miner_data_handler.py b/tests/test_miner_data_handler.py index 93b378d2..741b5e0e 100644 --- a/tests/test_miner_data_handler.py +++ b/tests/test_miner_data_handler.py @@ -1,209 +1,251 @@ -import unittest from datetime import datetime -from simulation.validator.miner_data_handler import MinerDataHandler -from tests.utils import generate_values +import pytest + +from simulation.db.models import miner_predictions, validator_requests from simulation.simulation_input import SimulationInput +from simulation.validator.miner_data_handler import MinerDataHandler +from tests.utils import generate_values -class TestMinerDataHandler(unittest.TestCase): - def setUp(self): - """Set up a temporary file for testing.""" - self.handler = MinerDataHandler() - - def test_get_values_within_range(self): - """ - Test retrieving values within the valid time range. - 2024-11-20T00:00:00 2024-11-20T23:55:00 - |-------------------------| (Prediction range) - - 2024-11-22T00:00:00 - |-| (Current Time) - """ - miner_id = 1 - start_time = "2024-11-20T00:00:00" - current_time = "2024-11-22T00:00:00" - simulation_input = SimulationInput( - asset="BTC", - start_time=start_time, - time_increment=300, - time_length=86400, - num_simulations=100, - ) - - values = generate_values(datetime.fromisoformat(start_time)) - self.handler.set_values(miner_id, values, simulation_input) - - result = self.handler.get_values(miner_id, current_time) - - self.assertEqual(1, len(result)) - self.assertEqual(288, len(result[0])) # Half of the 288 intervals - self.assertEqual( - {"time": "2024-11-20T12:00:00", "price": 90000}, result[0][0] - ) - self.assertEqual( - {"time": "2024-11-21T11:55:00", "price": 233500}, result[0][287] - ) - - def test_get_values_exceeding_range(self): - """ - Test retrieving values when current_time exceeds the range. - 2024-11-20T00:00:00 2024-11-20T23:55:00 - |-------------------------| (Prediction range) - - 2024-11-30T00:00:00 - |-| (Current Time - more than 5 days passed) - """ - miner_id = 1 - start_time = "2024-11-20T00:00:00" - current_time = "2024-11-30T00:00:00" - simulation_input = SimulationInput( - asset="BTC", - start_time=start_time, - time_increment=300, - time_length=86400, - num_simulations=100, - ) - - values = generate_values(datetime.fromisoformat(start_time)) - self.handler.set_values(miner_id, values, simulation_input) - - result = self.handler.get_values(miner_id, current_time) - # self.assertEqual(result, []) # TODO: do we want this 5 days expiry? - - def test_get_values_ongoing_range(self): - """ - Test retrieving values when current_time overlaps with the range. - 2024-11-20T00:00:00 2024-11-20T23:55:00 - |-------------------------| (Prediction range) - - 2024-11-20T12:00:00 - |-| (Current Time) - """ - miner_id = 1 - start_time = "2024-11-20T00:00:00" - current_time = "2024-11-20T12:00:00" - - simulation_input = SimulationInput( - asset="BTC", - start_time=start_time, - time_increment=300, - time_length=86400, - num_simulations=100, - ) - - values = generate_values(datetime.fromisoformat(start_time)) - self.handler.set_values(miner_id, values, simulation_input) - - result = self.handler.get_values(miner_id, current_time) - self.assertEqual(result, []) - - def test_multiple_records_for_same_miner(self): - """ - Test handling multiple records for the same miner. - Should take "Prediction range 2" as the latest one - - 2024-11-20T00:00:00 2024-11-20T23:55:00 - |-------------------------| (Prediction range 1) - - 2024-11-20T12:00:00 2024-11-21T11:55:00 - |-------------------------| (Prediction range 2) - - 2024-11-21T15:00:00 - |-| (Current Time) - """ - miner_id = 1 - start_time_1 = "2024-11-20T00:00:00" - start_time_2 = "2024-11-20T12:00:00" - current_time = "2024-11-21T15:00:00" - - simulation_input1 = SimulationInput( - asset="BTC", - start_time=start_time_1, - time_increment=300, - time_length=86400, - num_simulations=100, - ) - simulation_input2 = SimulationInput( - asset="BTC", - start_time=start_time_2, - time_increment=300, - time_length=86400, - num_simulations=100, - ) - - values = generate_values(datetime.fromisoformat(start_time_1)) - self.handler.set_values(miner_id, values, simulation_input1) - - values = generate_values(datetime.fromisoformat(start_time_2)) - self.handler.set_values(miner_id, values, simulation_input2) - - result = self.handler.get_values(miner_id, current_time) - - self.assertEqual(1, len(result)) - self.assertEqual(288, len(result[0])) # Half of the 288 intervals - self.assertEqual( - {"time": "2024-11-20T12:00:00", "price": 90000}, result[0][0] - ) - self.assertEqual( - {"time": "2024-11-21T11:55:00", "price": 233500}, result[0][287] - ) - - def test_multiple_records_for_same_miner_with_overlapping(self): - """ - Test handling multiple records for the same miner with overlapping records. - Should take "Prediction range 1" as the latest one - - 2024-11-20T00:00:00 2024-11-20T23:55:00 - |-------------------------| (Prediction range 1) - - 2024-11-20T12:00:00 2024-11-21T11:55:00 - |-------------------------| (Prediction range 2) - - 2024-11-21T03:00:00 - |-| (Current Time) - """ - miner_id = 1 - start_time_1 = "2024-11-20T00:00:00" - start_time_2 = "2024-11-20T12:00:00" - current_time = "2024-11-21T03:00:00" - simulation_input1 = SimulationInput( - asset="BTC", - start_time=start_time_1, - time_increment=300, - time_length=86400, - num_simulations=100, - ) - - simulation_input2 = SimulationInput( - asset="BTC", - start_time=start_time_2, - time_increment=300, - time_length=86400, - num_simulations=100, - ) - - values = generate_values(datetime.fromisoformat(start_time_1)) - self.handler.set_values(miner_id, values, simulation_input1) - - values = generate_values(datetime.fromisoformat(start_time_2)) - self.handler.set_values(miner_id, values, simulation_input2) - - result = self.handler.get_values(miner_id, current_time) - - self.assertEqual(1, len(result)) - self.assertEqual(288, len(result[0])) # Half of the 288 intervals - self.assertEqual( - {"time": "2024-11-20T00:00:00", "price": 90000}, result[0][0] - ) - self.assertEqual( - {"time": "2024-11-20T23:55:00", "price": 233500}, result[0][287] - ) - - def test_no_data_for_miner(self): - """Test retrieving values for a miner that doesn't exist.""" - miner_id = 0 - current_time = "2024-11-20T12:00:00" - - result = self.handler.get_values(miner_id, current_time) - self.assertEqual(result, []) +@pytest.fixture(scope="function", autouse=True) +def setup_data(db_engine): + with db_engine.connect() as connection: + with connection.begin(): + mp = miner_predictions.delete() + vr = validator_requests.delete() + connection.execute(mp) + connection.execute(vr) + + +def test_get_values_within_range(db_engine): + """ + Test retrieving values within the valid time range. + 2024-11-20T00:00:00 2024-11-20T23:55:00 + |-------------------------| (Prediction range) + + 2024-11-22T00:00:00 + |-| (Scored Time) + """ + miner_id = 1 + start_time = "2024-11-20T00:00:00" + scored_time = "2024-11-22T00:00:00" + simulation_input = SimulationInput( + asset="BTC", + start_time=start_time, + time_increment=300, + time_length=86400, + num_simulations=1, + ) + + values = generate_values(datetime.fromisoformat(start_time)) + simulation_data = {miner_id: values} + handler = MinerDataHandler(db_engine) + handler.save_responses(simulation_data, simulation_input) + + validator_request_id = handler.get_latest_prediction_request( + scored_time, simulation_input + ) + result = handler.get_miner_prediction(miner_id, validator_request_id) + + # get only second element from the result tuple + # that corresponds to the prediction result + prediction = result[1] + + assert len(prediction) == 1 + assert len(prediction[0]) == 288 + assert prediction[0][0] == {"time": "2024-11-20T00:00:00", "price": 90000} + assert prediction[0][287] == { + "time": "2024-11-20T23:55:00", + "price": 233500, + } + + +def test_get_values_ongoing_range(db_engine): + """ + Test retrieving values when current_time overlaps with the range. + 2024-11-20T00:00:00 2024-11-20T23:55:00 + |-------------------------| (Prediction range) + + 2024-11-20T12:00:00 + |-| (Scored Time) + """ + miner_id = 1 + start_time = "2024-11-20T00:00:00" + scored_time = "2024-11-20T12:00:00" + + simulation_input = SimulationInput( + asset="BTC", + start_time=start_time, + time_increment=300, + time_length=86400, + num_simulations=1, + ) + + values = generate_values(datetime.fromisoformat(start_time)) + simulation_data = {miner_id: values} + handler = MinerDataHandler(db_engine) + handler.save_responses(simulation_data, simulation_input) + + validator_request_id = handler.get_latest_prediction_request( + scored_time, simulation_input + ) + result = handler.get_miner_prediction(miner_id, validator_request_id) + + # get only second element from the result tuple + # that corresponds to the prediction result + prediction = result[1] + + assert len(prediction) == 0 + + +def test_multiple_records_for_same_miner(db_engine): + """ + Test handling multiple records for the same miner. + Should take "Prediction range 2" as the latest one + + 2024-11-20T00:00:00 2024-11-20T23:55:00 + |-------------------------| (Prediction range 1) + + 2024-11-20T12:00:00 2024-11-21T11:55:00 + |-------------------------| (Prediction range 2) + + 2024-11-21T15:00:00 + |-| (Current Time) + """ + miner_id = 1 + start_time_1 = "2024-11-20T00:00:00+00:00" + start_time_2 = "2024-11-20T12:00:00+00:00" + scored_time = "2024-11-21T15:00:00+00:00" + + simulation_input_1 = SimulationInput( + asset="BTC", + start_time=start_time_1, + time_increment=300, + time_length=86400, + num_simulations=1, + ) + + simulation_input_2 = SimulationInput( + asset="BTC", + start_time=start_time_2, + time_increment=300, + time_length=86400, + num_simulations=1, + ) + + handler = MinerDataHandler(db_engine) + + values_1 = generate_values(datetime.fromisoformat(start_time_1)) + simulation_data_1 = {miner_id: values_1} + handler.save_responses(simulation_data_1, simulation_input_1) + + values_2 = generate_values(datetime.fromisoformat(start_time_2)) + simulation_data_2 = {miner_id: values_2} + handler.save_responses(simulation_data_2, simulation_input_2) + + validator_request_id = handler.get_latest_prediction_request( + scored_time, simulation_input_1 + ) + result = handler.get_miner_prediction(miner_id, validator_request_id) + + # get only second element from the result tuple + # that corresponds to the prediction result + prediction = result[1] + + assert len(prediction) == 1 + assert len(prediction[0]) == 288 + assert prediction[0][0] == { + "time": "2024-11-20T12:00:00+00:00", + "price": 90000, + } + assert prediction[0][287] == { + "time": "2024-11-21T11:55:00+00:00", + "price": 233500, + } + + +def test_multiple_records_for_same_miner_with_overlapping(db_engine): + """ + Test handling multiple records for the same miner with overlapping records. + Should take "Prediction range 1" as the latest one + + 2024-11-20T00:00:00 2024-11-20T23:55:00 + |-------------------------| (Prediction range 1) + + 2024-11-20T12:00:00 2024-11-21T11:55:00 + |-------------------------| (Prediction range 2) + + 2024-11-21T03:00:00 + |-| (Scored Time) + """ + miner_id = 1 + start_time_1 = "2024-11-20T00:00:00+00:00" + start_time_2 = "2024-11-20T12:00:00+00:00" + scored_time = "2024-11-21T03:00:00+00:00" + + simulation_input_1 = SimulationInput( + asset="BTC", + start_time=start_time_1, + time_increment=300, + time_length=86400, + num_simulations=1, + ) + + simulation_input_2 = SimulationInput( + asset="BTC", + start_time=start_time_2, + time_increment=300, + time_length=86400, + num_simulations=1, + ) + + handler = MinerDataHandler(db_engine) + + values_1 = generate_values(datetime.fromisoformat(start_time_1)) + simulation_data_1 = {miner_id: values_1} + handler.save_responses(simulation_data_1, simulation_input_1) + + values_2 = generate_values(datetime.fromisoformat(start_time_2)) + simulation_data_2 = {miner_id: values_2} + handler.save_responses(simulation_data_2, simulation_input_2) + + validator_request_id = handler.get_latest_prediction_request( + scored_time, simulation_input_1 + ) + result = handler.get_miner_prediction(miner_id, validator_request_id) + + # get only second element from the result tuple + # that corresponds to the prediction result + prediction = result[1] + + assert len(prediction) == 1 + assert len(prediction[0]) == 288 + assert prediction[0][0] == { + "time": "2024-11-20T00:00:00+00:00", + "price": 90000, + } + assert prediction[0][287] == { + "time": "2024-11-20T23:55:00+00:00", + "price": 233500, + } + + +def test_no_data_for_miner(db_engine): + """Test retrieving values for a miner that doesn't exist.""" + scored_time = "2024-11-20T12:00:00+00:00" + + simulation_input = SimulationInput( + asset="BTC", + start_time=scored_time, + time_increment=300, + time_length=86400, + num_simulations=1, + ) + + handler = MinerDataHandler(db_engine) + + validator_request_id = handler.get_latest_prediction_request( + scored_time, simulation_input + ) + assert validator_request_id is None diff --git a/tests/test_rewards.py b/tests/test_rewards.py index 248ef4e5..fd36e8ab 100644 --- a/tests/test_rewards.py +++ b/tests/test_rewards.py @@ -1,259 +1,134 @@ -import unittest -from unittest.mock import Mock from datetime import datetime import numpy as np +import pytest from numpy.testing import assert_almost_equal, assert_equal -from simulation.validator.forward import remove_zero_rewards +from simulation.db.models import ( + miner_predictions, + validator_requests, + miner_scores, +) from simulation.simulation_input import SimulationInput +from simulation.validator.forward import remove_zero_rewards +from simulation.validator.miner_data_handler import MinerDataHandler from simulation.validator.price_data_provider import PriceDataProvider -from simulation.validator.reward import get_rewards, compute_softmax +from simulation.validator.reward import compute_softmax, get_rewards from tests.utils import generate_values -from simulation.validator.miner_data_handler import MinerDataHandler - - -class TestRewards(unittest.TestCase): - def setUp(self): - """Set up a temporary file for testing.""" - self.price_data_provider = PriceDataProvider("BTC") - self.handler = MinerDataHandler() - - def test_compute_softmax_1(self): - score_values = np.array([1000, 1500, 2000]) - expected_score = np.array([0.506, 0.307, 0.186]) - - actual_score = compute_softmax(score_values) - - assert_almost_equal(actual_score, expected_score, decimal=3) - - def test_compute_softmax_2(self): - score_values = np.array([1000, 1500, 2000, -1]) - expected_score = np.array([0.506, 0.307, 0.186, 0]) - - actual_score = compute_softmax(score_values) - - assert_almost_equal(actual_score, expected_score, decimal=3) - - def test_remove_zero_rewards(self): - rewards = np.array([0.0, 5.0, 0.0, 10.0]) - miner_uids = [0, 1, 2, 3] - - filtered_rewards, filtered_miner_uids = remove_zero_rewards( - rewards, miner_uids - ) - - assert_equal(filtered_rewards, np.array([5.0, 10.0])) - self.assertEqual(len(filtered_miner_uids), 2) - self.assertEqual(1, filtered_miner_uids[0]) - self.assertEqual(3, filtered_miner_uids[1]) - - def test_get_rewards(self): - miner_id = 0 - start_time = "2024-11-26T00:00:00" - current_time = "2024-11-28T00:00:00" - simulation_input = SimulationInput( - asset="BTC", - start_time=start_time, - time_increment=300, - time_length=86400, - num_simulations=100, - ) - values = generate_values(datetime.fromisoformat(start_time)) - self.handler.set_values(miner_id, values, simulation_input) - softmax_scores = get_rewards( - self.handler, - self.price_data_provider, - SimulationInput( - asset="BTC", - start_time=current_time, - time_increment=60, # default: 5 mins - time_length=3600, # default: 1 day - num_simulations=1, # default: 100 - ), - [miner_id], # TODO: add another test with more miners - current_time, - ) - print(softmax_scores) - # TODO: assert the scores - - def test_get_rewards_scores(self): - mock_miner_data_handler = Mock() - mock_price_data_provider = Mock() - - miner_uids = [1, 2, 3] - validation_time = "2024-11-25T20:30:00" - - def mock_get_values(miner_uid, mock_validation_time): - if miner_uid == 1: - return [ - [ - {"time": "2024-11-25T20:20:00", "price": 90000}, - {"time": "2024-11-25T20:25:00", "price": 91000}, - {"time": "2024-11-25T20:30:00", "price": 92000}, - {"time": "2024-11-25T20:35:00", "price": 92500}, - {"time": "2024-11-25T20:40:00", "price": 92600}, - {"time": "2024-11-25T20:45:00", "price": 92500}, - ], - [ - {"time": "2024-11-25T20:20:00", "price": 90500}, - {"time": "2024-11-25T20:25:00", "price": 91500}, - {"time": "2024-11-25T20:30:00", "price": 92500}, - {"time": "2024-11-25T20:35:00", "price": 93500}, - {"time": "2024-11-25T20:40:00", "price": 92900}, - {"time": "2024-11-25T20:45:00", "price": 92100}, - ], - [ - {"time": "2024-11-25T20:20:00", "price": 91500}, - {"time": "2024-11-25T20:25:00", "price": 92500}, - {"time": "2024-11-25T20:30:00", "price": 94500}, - {"time": "2024-11-25T20:35:00", "price": 90500}, - {"time": "2024-11-25T20:40:00", "price": 90900}, - {"time": "2024-11-25T20:45:00", "price": 90100}, - ], - ] - elif miner_uid == 2: - return [ - [ - {"time": "2024-11-25T20:20:00", "price": 100000}, - {"time": "2024-11-25T20:25:00", "price": 101000}, - {"time": "2024-11-25T20:30:00", "price": 102000}, - {"time": "2024-11-25T20:35:00", "price": 102500}, - {"time": "2024-11-25T20:40:00", "price": 102600}, - {"time": "2024-11-25T20:45:00", "price": 102500}, - ], - [ - {"time": "2024-11-25T20:20:00", "price": 100500}, - {"time": "2024-11-25T20:25:00", "price": 101500}, - {"time": "2024-11-25T20:30:00", "price": 102500}, - {"time": "2024-11-25T20:35:00", "price": 103500}, - {"time": "2024-11-25T20:40:00", "price": 102900}, - {"time": "2024-11-25T20:45:00", "price": 102100}, - ], - [ - {"time": "2024-11-25T20:20:00", "price": 101500}, - {"time": "2024-11-25T20:25:00", "price": 102500}, - {"time": "2024-11-25T20:30:00", "price": 104500}, - {"time": "2024-11-25T20:35:00", "price": 100500}, - {"time": "2024-11-25T20:40:00", "price": 100900}, - {"time": "2024-11-25T20:45:00", "price": 100100}, - ], - ] - elif miner_uid == 3: - return [ - [ - {"time": "2024-11-25T20:20:00", "price": 50000}, - {"time": "2024-11-25T20:25:00", "price": 51000}, - {"time": "2024-11-25T20:30:00", "price": 52000}, - {"time": "2024-11-25T20:35:00", "price": 52500}, - {"time": "2024-11-25T20:40:00", "price": 52600}, - {"time": "2024-11-25T20:45:00", "price": 52500}, - ], - [ - {"time": "2024-11-25T20:20:00", "price": 60000}, - {"time": "2024-11-25T20:25:00", "price": 61000}, - {"time": "2024-11-25T20:30:00", "price": 62000}, - {"time": "2024-11-25T20:35:00", "price": 62500}, - {"time": "2024-11-25T20:40:00", "price": 62600}, - {"time": "2024-11-25T20:45:00", "price": 62500}, - ], - [ - {"time": "2024-11-25T20:20:00", "price": 70000}, - {"time": "2024-11-25T20:25:00", "price": 71000}, - {"time": "2024-11-25T20:30:00", "price": 72000}, - {"time": "2024-11-25T20:35:00", "price": 72500}, - {"time": "2024-11-25T20:40:00", "price": 72600}, - {"time": "2024-11-25T20:45:00", "price": 72500}, - ], - ] - - mock_miner_data_handler.get_values.side_effect = mock_get_values - - mock_price_data_provider.fetch_data.return_value = [ - {"time": "2024-11-25T20:00:00", "price": 90000}, - {"time": "2024-11-25T20:05:00", "price": 91000}, - {"time": "2024-11-25T20:10:00", "price": 92000}, - {"time": "2024-11-25T20:15:00", "price": 92500}, - {"time": "2024-11-25T20:20:00", "price": 92600}, - {"time": "2024-11-25T20:25:00", "price": 92500}, - {"time": "2024-11-25T20:30:00", "price": 93500}, - ] - - simulation_input = SimulationInput( +@pytest.fixture(scope="function", autouse=True) +def setup_data(db_engine): + with db_engine.connect() as connection: + with connection.begin(): + ms = miner_scores.delete() + mp = miner_predictions.delete() + vr = validator_requests.delete() + connection.execute(ms) + connection.execute(mp) + connection.execute(vr) + + +def test_compute_softmax_1(): + score_values = np.array([1000, 1500, 2000]) + expected_score = np.array([0.506, 0.307, 0.186]) + + actual_score = compute_softmax(score_values) + + assert_almost_equal(actual_score, expected_score, decimal=3) + + +def test_compute_softmax_2(): + score_values = np.array([1000, 1500, 2000, -1]) + expected_score = np.array([0.506, 0.307, 0.186, 0]) + + actual_score = compute_softmax(score_values) + + assert_almost_equal(actual_score, expected_score, decimal=3) + + +def test_remove_zero_rewards(): + moving_average_rewards = [ + { + "miner_uid": 0, + "smoothed_score": float(0), + "reward_weight": float(0), + "updated_at": "2024-11-20T00:00:00", + }, + { + "miner_uid": 1, + "smoothed_score": float(0.2), + "reward_weight": float(0.2), + "updated_at": "2024-11-20T00:00:00", + }, + { + "miner_uid": 2, + "smoothed_score": float(0), + "reward_weight": float(0), + "updated_at": "2024-11-20T00:00:00", + }, + { + "miner_uid": 3, + "smoothed_score": float(0.8), + "reward_weight": float(0.8), + "updated_at": "2024-11-20T00:00:00", + }, + ] + + filtered_rewards, filtered_miner_uids = remove_zero_rewards( + moving_average_rewards + ) + + assert len(filtered_miner_uids) == 2 + assert len(filtered_rewards) == 2 + assert filtered_miner_uids[0] == 1 + assert filtered_miner_uids[1] == 3 + assert filtered_rewards[0] == 0.2 + assert filtered_rewards[1] == 0.8 + + +def test_get_rewards(db_engine): + miner_id = 0 + start_time = "2024-11-26T00:00:00+00:00" + scored_time = "2024-11-28T00:00:00+00:00" + + simulation_input = SimulationInput( + asset="BTC", + start_time=start_time, + time_increment=300, + time_length=86400, + num_simulations=1, + ) + + handler = MinerDataHandler(db_engine) + price_data_provider = PriceDataProvider( + "BTC" + ) # TODO: add a mock instead of the real provider + + values = generate_values(datetime.fromisoformat(start_time)) + simulation_data = {miner_id: values} + handler.save_responses(simulation_data, simulation_input) + + validator_request_id = handler.get_latest_prediction_request( + scored_time, simulation_input + ) + + softmax_scores = get_rewards( + handler, + price_data_provider, + SimulationInput( asset="BTC", - start_time=validation_time, - time_increment=300, # 5 mins - time_length=86400, # 1 day - num_simulations=3, - ) - - result = get_rewards( - miner_data_handler=mock_miner_data_handler, - price_data_provider=mock_price_data_provider, - simulation_input=simulation_input, - miner_uids=miner_uids, - validation_time=validation_time, - ) - - print(result) - - def test_get_rewards_scores_if_one_of_the_miners_returns_no_data(self): - mock_miner_data_handler = Mock() - mock_price_data_provider = Mock() - - miner_uids = [1, 2, 3] - validation_time = "2024-11-25T20:30:00" - - def mock_get_values(miner_uid, mock_validation_time): - if miner_uid == 1: - return [ - {"time": "2024-11-25T20:20:00", "price": 90000}, - {"time": "2024-11-25T20:25:00", "price": 91000}, - {"time": "2024-11-25T20:30:00", "price": 92000}, - {"time": "2024-11-25T20:35:00", "price": 92500}, - {"time": "2024-11-25T20:40:00", "price": 92600}, - {"time": "2024-11-25T20:45:00", "price": 92500}, - ] - elif miner_uid == 2: - return [] - elif miner_uid == 3: - return [ - {"time": "2024-11-25T20:20:00", "price": 50000}, - {"time": "2024-11-25T20:25:00", "price": 51000}, - {"time": "2024-11-25T20:30:00", "price": 52000}, - {"time": "2024-11-25T20:35:00", "price": 52500}, - {"time": "2024-11-25T20:40:00", "price": 52600}, - {"time": "2024-11-25T20:45:00", "price": 52500}, - ] - - mock_miner_data_handler.get_values.side_effect = mock_get_values - - mock_price_data_provider.fetch_data.return_value = [ - {"time": "2024-11-25T20:00:00", "price": 90000}, - {"time": "2024-11-25T20:05:00", "price": 91000}, - {"time": "2024-11-25T20:10:00", "price": 92000}, - {"time": "2024-11-25T20:15:00", "price": 92500}, - {"time": "2024-11-25T20:20:00", "price": 92600}, - {"time": "2024-11-25T20:25:00", "price": 92500}, - {"time": "2024-11-25T20:30:00", "price": 93500}, - ] - - simulation_input = SimulationInput( - asset="BTC", - start_time=validation_time, - time_increment=300, # default: 5 mins - time_length=86400, # default: 1 day + start_time=scored_time, + time_increment=60, # default: 5 mins + time_length=3600, # default: 1 day num_simulations=1, # default: 100 - ) - - result = get_rewards( - miner_data_handler=mock_miner_data_handler, - price_data_provider=mock_price_data_provider, - simulation_input=simulation_input, - miner_uids=miner_uids, - validation_time=validation_time, - ) - - print(result) + ), + [miner_id], # TODO: add another test with more miners + validator_request_id, + ) + + assert len(softmax_scores) == 2 + assert softmax_scores[1][0]["miner_uid"] == miner_id + assert softmax_scores[1][0]["softmax_score"] == 1.0 + assert len(softmax_scores[1][0]["crps_data"]) == 72 + # TODO: assert the scores