From 3a009d92e687eb60d00b02e2609986ebb7c5b9db Mon Sep 17 00:00:00 2001 From: Pyython Date: Fri, 3 May 2024 14:53:02 -0400 Subject: [PATCH 01/11] use block hash to make steps deterministic --- finetune/dataset.py | 143 ++++++++++++++++++++++++------------------- neurons/validator.py | 14 +++-- 2 files changed, 91 insertions(+), 66 deletions(-) diff --git a/finetune/dataset.py b/finetune/dataset.py index 2cd47cb..2d11d5b 100644 --- a/finetune/dataset.py +++ b/finetune/dataset.py @@ -216,81 +216,100 @@ ] class CortexSubsetLoader(IterableDataset): - def __init__(self, latest=True, random_seed: typing.Optional[int] = None, - max_samples=300, steps: typing.Optional[int]=1, progress=False, - retry_limit=10, page_size=100, running: typing.Optional[bool]=False, - cortex_project=constants.CORTEX_WANDB_PROJECT, - cortex_type=constants.CORTEX_WANDB_TYPE): - api = wandb.Api(timeout=100) - - filters = [ - { "config.type": cortex_type } - ] + def __init__( + self, + latest=True, + random_seed: typing.Optional[int] = None, + max_samples=300, + steps: typing.Optional[int] = 1, + progress=False, + retry_limit=10, + page_size=100, + running: typing.Optional[bool] = False, + cortex_project=constants.CORTEX_WANDB_PROJECT, + cortex_type=constants.CORTEX_WANDB_TYPE, + ): + self.api = wandb.Api(timeout=100) + self.filters = [{"config.type": cortex_type}] if running: - filters.append( {"state": "running"} ) - runs = api.runs(cortex_project, filters={"$and": filters}) + self.filters.append({"state": "running"}) + self.runs = self.api.runs(cortex_project, filters={"$and": self.filters}) + self.retry_delay = 5 # Seconds to wait between retries + self.max_samples = max_samples + self.steps = steps + self.progress = progress + self.retry_limit = retry_limit + self.page_size = page_size + self.latest = latest + self.generator = np.random.default_rng(seed=random_seed) if random_seed is not None else None - retry_delay = 5 # Seconds to wait between retries - attempt = 0 + self.run_order = list(range(len(self.runs))) + if self.generator is not None: + self.generator.shuffle(self.run_order) - generator = np.random.default_rng(seed=random_seed) if random_seed else None + self.last_steps = [] + for run in self.runs: + if self.latest: + last_step: int = run.lastHistoryStep + else: + last_step = int(self.generator.random() * run.lastHistoryStep) if self.generator is not None else 0 + self.last_steps.append(last_step) - while attempt < retry_limit: - try: - run_order = list(range(len(runs))) + self.buffer: typing.List[typing.Tuple[str, str]] = [] + self.selected_runs: typing.List[int] = [] - if generator is not None: - generator.shuffle(run_order) + self.fetch_data() - self.buffer: typing.List[typing.Tuple[str, str]] = [] - self.selected_runs: typing.List[int] = [] + def fetch_data(self): + attempt = 0 - for run_index in tqdm(run_order, desc="Run", leave=False, disable=not progress): - run = runs[run_index] + while attempt < self.retry_limit: + try: + for run_index in tqdm(self.run_order, desc="Run", leave=False, disable=not self.progress): + run = self.runs[run_index] self.selected_runs.append(run_index) - if latest: - last_step: int = run.lastHistoryStep - elif generator is not None: - last_step = int(generator.random() * run.lastHistoryStep) - else: - last_step = 0 + last_step = self.last_steps[run_index] max_step = last_step + 1 - min_step = max(0, max_step - steps) if steps is not None else 0 - history_scan = HistoryScan(run.client, run, min_step, max_step, page_size=page_size) - while True: - try: - sample = next(history_scan) - for uid in range(constants.CORTEX_MAX_UIDS): - try: - prompt: typing.Optional[str] = sample[f"prompts.{uid}"] - response: typing.Optional[str] = sample[f"responses.{uid}"] - if isinstance(prompt, str) and isinstance(response, str): - prompt = prompt.strip() - response = response.strip() - if len(prompt) > 0 and len(response) > 0: - if not any(x in response for x in UNWANTED_PHRASES): - self.buffer.append((prompt, response)) - if len(self.buffer) == max_samples: - return - except KeyError: - pass - except StopIteration: - break - bt.logging.warning(f"Did not collect {max_samples}, only got {len(self.buffer)}") + min_step = max(0, max_step - self.steps) if self.steps is not None else 0 + + history_scan = HistoryScan(run.client, run, min_step, max_step, page_size=self.page_size) + self.process_history_scan(history_scan) + + if len(self.buffer) == self.max_samples: + return + + bt.logging.warning(f"Did not collect {self.max_samples}, only got {len(self.buffer)}") return - except: + + except Exception as e: attempt += 1 - bt.logging.warning( - f"Failed to fetch data, retrying. Attempt {attempt}/{retry_limit}" - ) - if attempt < retry_limit: - time.sleep(retry_delay) # Wait before the next retry + bt.logging.warning(f"Failed to fetch data, retrying. Attempt {attempt}/{self.retry_limit}") + if attempt < self.retry_limit: + time.sleep(self.retry_delay) # Wait before the next retry else: - bt.logging.error( - "Maximum retry limit reached. Unable to fetch data." - ) - raise + bt.logging.error("Maximum retry limit reached. Unable to fetch data.") + raise e + + def process_history_scan(self, history_scan): + while True: + try: + sample = next(history_scan) + for uid in range(constants.CORTEX_MAX_UIDS): + prompt: typing.Optional[str] = sample.get(f"prompts.{uid}") + response: typing.Optional[str] = sample.get(f"responses.{uid}") + + if isinstance(prompt, str) and isinstance(response, str): + prompt = prompt.strip() + response = response.strip() + + if prompt and response and not any(x in response for x in UNWANTED_PHRASES): + self.buffer.append((prompt, response)) + if len(self.buffer) == self.max_samples: + return + + except StopIteration: + break def tokenize(self, tokenizer: PreTrainedTokenizerBase) -> typing.List[typing.Tuple[torch.Tensor, int]]: batches = [] diff --git a/neurons/validator.py b/neurons/validator.py index c5436b1..6cb96ee 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -491,7 +491,7 @@ async def run_step(self): """ Executes a step in the evaluation process of models. This function performs several key tasks: 1. Identifies valid models for evaluation (top 5 from last run + newly updated models). - 2. Generates random pages for evaluation and prepares batches for each page from the dataset. + 2. Generates deterministic pages (using block hash as seed) for evaluation and prepares batches for each page from the dataset. 3. Computes the scoring for each model based on the losses incurred on the evaluation batches. 4. Calculates wins and win rates for each model to determine their performance relative to others. 5. Updates the weights of each model based on their performance and applies a softmax normalization. @@ -501,6 +501,12 @@ async def run_step(self): # Update self.metagraph await self.try_sync_metagraph(ttl=60) + block_number = self.metagraph.block.item() + print(f"Block number: {block_number}") + block_hash = self.subtensor.get_block_hash(block_id=block_number) + print(f"Block hash: {block_hash}") + seed = int(block_hash, 0) + print(f"Seed: {seed}") competition_parameters = constants.COMPETITION_SCHEDULE[self.global_step % len(constants.COMPETITION_SCHEDULE)] @@ -535,7 +541,7 @@ async def run_step(self): with pull_data_perf.sample(): cortex_data = ft.dataset.CortexSubsetLoader( latest=True, running=True, - random_seed=random.randint(0, sys.maxsize), + random_seed=seed, max_samples=self.config.latest_cortex_samples, steps=self.config.latest_cortex_steps, page_size=self.config.latest_cortex_steps, @@ -571,10 +577,10 @@ async def run_step(self): for other_uid, (other_hotkey, other_metadata) in uid_to_hotkey_and_model_metadata.items(): if other_metadata and model_i_metadata.id.hash == other_metadata.id.hash: if model_i_metadata.block < other_metadata.block: - bt.logging.debug(f"Perferring duplicate of {other_uid} with {uid_i} since it is older") + bt.logging.debug(f"Preferring duplicate of {other_uid} with {uid_i} since it is older") uid_to_hotkey_and_model_metadata[other_uid] = (other_hotkey, None) else: - bt.logging.debug(f"Perferring duplicate of {uid_i} with {other_uid} since it is newer") + bt.logging.debug(f"Preferring duplicate of {uid_i} with {other_uid} since it is newer") model_i_metadata = None break From b7ec906d6721b86fa8b51de78fe997357e21656f Mon Sep 17 00:00:00 2001 From: Pyython Date: Fri, 3 May 2024 15:40:13 -0400 Subject: [PATCH 02/11] try catch for block hash --- neurons/validator.py | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/neurons/validator.py b/neurons/validator.py index 6cb96ee..e169dcb 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -192,6 +192,7 @@ def __init__(self): self.subtensor = bt.subtensor(config=self.config) self.dendrite = bt.dendrite(wallet=self.wallet) self.metagraph: bt.metagraph = self.subtensor.metagraph(self.config.netuid) + self.block_hash: str = "0x0" torch.backends.cudnn.benchmark = True # Dont check registration status if offline. @@ -455,6 +456,26 @@ async def _try_set_weights(): except asyncio.TimeoutError: bt.logging.error(f"Failed to set weights after {ttl} seconds") + async def try_get_blockhash(self, block_number: int, ttl: int): + def get_blockhash(endpoint): + # Update self.block_hash + self.block_hash = bt.subtensor(endpoint).get_block_hash(block_id=block_number) + + process = multiprocessing.Process( + target=get_blockhash, args=(self.subtensor.chain_endpoint,) + ) + process.start() + process.join(timeout=ttl) + if process.is_alive(): + process.terminate() + process.join() + bt.logging.error(f"Failed to fetch block hash after {ttl} seconds") + return + + bt.logging.info("Synced metagraph") + self.metagraph.load() + self.miner_iterator.set_miner_uids(self.metagraph.uids.tolist()) + async def try_sync_metagraph(self, ttl: int): def sync_metagraph(endpoint): # Update self.metagraph @@ -503,9 +524,9 @@ async def run_step(self): await self.try_sync_metagraph(ttl=60) block_number = self.metagraph.block.item() print(f"Block number: {block_number}") - block_hash = self.subtensor.get_block_hash(block_id=block_number) - print(f"Block hash: {block_hash}") - seed = int(block_hash, 0) + await self.try_get_blockhash(block_number, ttl=60) + print(f"Block hash: {self.block_hash}") + seed = int(self.block_hash, 0) print(f"Seed: {seed}") competition_parameters = constants.COMPETITION_SCHEDULE[self.global_step % len(constants.COMPETITION_SCHEDULE)] From baa105c81974536f5eb45bd311690c43e641e4ee Mon Sep 17 00:00:00 2001 From: Pyython Date: Fri, 3 May 2024 22:07:25 -0400 Subject: [PATCH 03/11] use multiprocessing value for block hash --- neurons/validator.py | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/neurons/validator.py b/neurons/validator.py index e169dcb..4f1bc9c 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -192,7 +192,7 @@ def __init__(self): self.subtensor = bt.subtensor(config=self.config) self.dendrite = bt.dendrite(wallet=self.wallet) self.metagraph: bt.metagraph = self.subtensor.metagraph(self.config.netuid) - self.block_hash: str = "0x0" + self.block_hash = multiprocessing.Manager().Value(typecode='c', value='0x0'.encode()) torch.backends.cudnn.benchmark = True # Dont check registration status if offline. @@ -456,13 +456,13 @@ async def _try_set_weights(): except asyncio.TimeoutError: bt.logging.error(f"Failed to set weights after {ttl} seconds") - async def try_get_blockhash(self, block_number: int, ttl: int): - def get_blockhash(endpoint): - # Update self.block_hash - self.block_hash = bt.subtensor(endpoint).get_block_hash(block_id=block_number) - + async def try_get_block_hash(self, block_number: int, ttl: int): + def get_block_hash(endpoint, block_hash): + # Update the shared value + block_hash.value = bt.subtensor(endpoint).get_block_hash(block_id=block_number).encode() + process = multiprocessing.Process( - target=get_blockhash, args=(self.subtensor.chain_endpoint,) + target=get_block_hash, args=(self.subtensor.chain_endpoint, self.block_hash) ) process.start() process.join(timeout=ttl) @@ -472,10 +472,7 @@ def get_blockhash(endpoint): bt.logging.error(f"Failed to fetch block hash after {ttl} seconds") return - bt.logging.info("Synced metagraph") - self.metagraph.load() - self.miner_iterator.set_miner_uids(self.metagraph.uids.tolist()) - + bt.logging.info(f"Updated block hash: {self.block_hash.value.decode()}") async def try_sync_metagraph(self, ttl: int): def sync_metagraph(endpoint): # Update self.metagraph @@ -524,9 +521,10 @@ async def run_step(self): await self.try_sync_metagraph(ttl=60) block_number = self.metagraph.block.item() print(f"Block number: {block_number}") - await self.try_get_blockhash(block_number, ttl=60) - print(f"Block hash: {self.block_hash}") - seed = int(self.block_hash, 0) + await self.try_get_block_hash(block_number, ttl=60) + block_hash = self.block_hash.value.decode() + print(f"Block hash: {block_hash}") + seed = int(block_hash, 0) print(f"Seed: {seed}") competition_parameters = constants.COMPETITION_SCHEDULE[self.global_step % len(constants.COMPETITION_SCHEDULE)] @@ -886,4 +884,4 @@ async def run(self): if __name__ == "__main__": - asyncio.run(Validator().run()) + asyncio.run(Validator().run()) \ No newline at end of file From 37387bee6b76201bbea11c20954ca05509f9a4e8 Mon Sep 17 00:00:00 2001 From: Pyython Date: Tue, 14 May 2024 10:21:49 -0400 Subject: [PATCH 04/11] pass in subtensor with publish metadata --- model/storage/chain/chain_model_metadata_store.py | 1 + 1 file changed, 1 insertion(+) diff --git a/model/storage/chain/chain_model_metadata_store.py b/model/storage/chain/chain_model_metadata_store.py index a78becd..ae7a3f3 100644 --- a/model/storage/chain/chain_model_metadata_store.py +++ b/model/storage/chain/chain_model_metadata_store.py @@ -35,6 +35,7 @@ async def store_model_metadata(self, hotkey: str, model_id: ModelId, wait_for_in # Wrap calls to the subtensor in a subprocess with a timeout to handle potential hangs. partial = functools.partial( bt.extrinsics.serving.publish_metadata, + self.subtensor, self.wallet, self.subnet_uid, f"Raw{len(data)}", From 5a452ef12ce900bf250c0c8ad676e4c0503c25e4 Mon Sep 17 00:00:00 2001 From: Pyython Date: Fri, 17 May 2024 20:37:50 -0400 Subject: [PATCH 05/11] verify run data --- finetune/dataset.py | 67 ++++++++++++++++++++++------------- neurons/miner.py | 1 + neurons/validator.py | 4 ++- utilities/model_validation.py | 2 ++ 4 files changed, 49 insertions(+), 25 deletions(-) diff --git a/finetune/dataset.py b/finetune/dataset.py index 2cd47cb..1b486d3 100644 --- a/finetune/dataset.py +++ b/finetune/dataset.py @@ -18,6 +18,7 @@ import typing import torch import bittensor as bt +from substrateinterface import Keypair import wandb from torch.utils.data import IterableDataset from wandb.apis.public.history import HistoryScan @@ -25,7 +26,6 @@ import constants import time import numpy as np -import math from tqdm import tqdm UNWANTED_PHRASES = [ @@ -216,16 +216,22 @@ ] class CortexSubsetLoader(IterableDataset): - def __init__(self, latest=True, random_seed: typing.Optional[int] = None, - max_samples=300, steps: typing.Optional[int]=1, progress=False, - retry_limit=10, page_size=100, running: typing.Optional[bool]=False, - cortex_project=constants.CORTEX_WANDB_PROJECT, - cortex_type=constants.CORTEX_WANDB_TYPE): + def __init__( + self, + subtensor: typing.Optional[bt.subtensor] = None, + latest=True, + random_seed: typing.Optional[int] = None, + max_samples=300, + steps: typing.Optional[int]=1, + progress=False, + retry_limit=10, + page_size=100, + running: typing.Optional[bool]=False, + cortex_project=constants.CORTEX_WANDB_PROJECT, + cortex_type=constants.CORTEX_WANDB_TYPE): api = wandb.Api(timeout=100) - filters = [ - { "config.type": cortex_type } - ] + filters = [{ "config.type": cortex_type }] if running: filters.append( {"state": "running"} ) runs = api.runs(cortex_project, filters={"$and": filters}) @@ -247,7 +253,18 @@ def __init__(self, latest=True, random_seed: typing.Optional[int] = None, for run_index in tqdm(run_order, desc="Run", leave=False, disable=not progress): run = runs[run_index] - self.selected_runs.append(run_index) + if run.config: + id = run.id + hotkey = run.config.get("hotkey") + signature = run.config.get("signature") + if id and hotkey and signature: + keypair = Keypair(ss58_address=hotkey) + verified = keypair.verify(id.encode(), bytes.fromhex(signature)) + if verified and subtensor is not None: + stake = subtensor.get_total_stake_for_hotkey(hotkey) + stake_int = int(stake) + if stake_int > 25000000000000: + self.selected_runs.append(run_index) if latest: last_step: int = run.lastHistoryStep @@ -261,20 +278,22 @@ def __init__(self, latest=True, random_seed: typing.Optional[int] = None, while True: try: sample = next(history_scan) - for uid in range(constants.CORTEX_MAX_UIDS): - try: - prompt: typing.Optional[str] = sample[f"prompts.{uid}"] - response: typing.Optional[str] = sample[f"responses.{uid}"] - if isinstance(prompt, str) and isinstance(response, str): - prompt = prompt.strip() - response = response.strip() - if len(prompt) > 0 and len(response) > 0: - if not any(x in response for x in UNWANTED_PHRASES): - self.buffer.append((prompt, response)) - if len(self.buffer) == max_samples: - return - except KeyError: - pass + if sample and sample["modality"] == "text": + for uid in range(constants.CORTEX_MAX_UIDS): + try: + prompt: typing.Optional[str] = sample[f"prompts.{uid}"] + response: typing.Optional[str] = sample[f"responses.{uid}"] + score: typing.Optional[float] = sample[f"scores.{uid}"] + if isinstance(prompt, str) and isinstance(response, str) and isinstance(score, float): + prompt = prompt.strip() + response = response.strip() + if len(prompt) > 0 and len(response) > 0 and score > 0.0: + if not any(x in response for x in UNWANTED_PHRASES): + self.buffer.append((prompt, response)) + if len(self.buffer) == max_samples: + return + except KeyError: + pass except StopIteration: break bt.logging.warning(f"Did not collect {max_samples}, only got {len(self.buffer)}") diff --git a/neurons/miner.py b/neurons/miner.py index d202215..2ab2ceb 100644 --- a/neurons/miner.py +++ b/neurons/miner.py @@ -309,6 +309,7 @@ async def main(config: bt.config): f"Loading {config.cortex_samples_per_epoch} pages for training this epoch" ) loader = ft.dataset.CortexSubsetLoader( + subtensor=subtensor, latest=False, random_seed=random.randint(0, 100000000), max_samples=config.cortex_samples_per_epoch, diff --git a/neurons/validator.py b/neurons/validator.py index 4021dcb..2820714 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -534,7 +534,9 @@ async def run_step(self): pull_data_perf = PerfMonitor("Eval: Pull data") with pull_data_perf.sample(): cortex_data = ft.dataset.CortexSubsetLoader( - latest=True, running=True, + subtensor=self.subtensor, + latest=True, + running=True, random_seed=random.randint(0, sys.maxsize), max_samples=self.config.latest_cortex_samples, steps=self.config.latest_cortex_steps, diff --git a/utilities/model_validation.py b/utilities/model_validation.py index f137844..b2a0755 100644 --- a/utilities/model_validation.py +++ b/utilities/model_validation.py @@ -2,6 +2,7 @@ import math import random +import bittensor as bt from model.data import ModelId from transformers import AutoTokenizer from finetune.dataset import CortexSubsetLoader @@ -99,6 +100,7 @@ def main(): with pull_data_perf.sample(): cortex_data = CortexSubsetLoader( + subtensor=bt.subtensor(), latest=True, running=True, random_seed=random.randint(0, sys.maxsize), From 7fba87c45d8a3cc23fd90986189dfaf4a78e3671 Mon Sep 17 00:00:00 2001 From: Pyython Date: Fri, 17 May 2024 21:22:07 -0400 Subject: [PATCH 06/11] conservative score filtering --- finetune/dataset.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/finetune/dataset.py b/finetune/dataset.py index 1b486d3..8f26a8a 100644 --- a/finetune/dataset.py +++ b/finetune/dataset.py @@ -284,12 +284,16 @@ def __init__( prompt: typing.Optional[str] = sample[f"prompts.{uid}"] response: typing.Optional[str] = sample[f"responses.{uid}"] score: typing.Optional[float] = sample[f"scores.{uid}"] - if isinstance(prompt, str) and isinstance(response, str) and isinstance(score, float): + if isinstance(prompt, str) and isinstance(response, str): prompt = prompt.strip() response = response.strip() - if len(prompt) > 0 and len(response) > 0 and score > 0.0: + if len(prompt) > 0 and len(response) > 0: if not any(x in response for x in UNWANTED_PHRASES): - self.buffer.append((prompt, response)) + if score and isinstance(score, float): + if score > 0.0: + self.buffer.append((prompt, response)) + else: + self.buffer.append((prompt, response)) if len(self.buffer) == max_samples: return except KeyError: From bb0228967750b495a09962e7cc493ed780212705 Mon Sep 17 00:00:00 2001 From: chpiatt Date: Sat, 18 May 2024 12:19:33 -0400 Subject: [PATCH 07/11] keyerror protection --- .gitignore | 2 ++ finetune/dataset.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 6924c4e..33ec8f2 100644 --- a/.gitignore +++ b/.gitignore @@ -173,3 +173,5 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ + +run.sh diff --git a/finetune/dataset.py b/finetune/dataset.py index 8f26a8a..76f3265 100644 --- a/finetune/dataset.py +++ b/finetune/dataset.py @@ -278,7 +278,7 @@ def __init__( while True: try: sample = next(history_scan) - if sample and sample["modality"] == "text": + if sample and sample.get("modality") == "text": for uid in range(constants.CORTEX_MAX_UIDS): try: prompt: typing.Optional[str] = sample[f"prompts.{uid}"] From 763021056c3b9277cd76f3a5d7ba26ca0b36355a Mon Sep 17 00:00:00 2001 From: chpiatt Date: Sat, 18 May 2024 12:43:33 -0400 Subject: [PATCH 08/11] updated version --- constants/__init__.py | 2 +- finetune/__init__.py | 10 ---------- neurons/miner.py | 3 ++- neurons/validator.py | 3 ++- 4 files changed, 5 insertions(+), 13 deletions(-) diff --git a/constants/__init__.py b/constants/__init__.py index 9cd0a26..be026e9 100644 --- a/constants/__init__.py +++ b/constants/__init__.py @@ -25,7 +25,7 @@ class CompetitionParameters: # Project Constants. # --------------------------------- -__version__ = "0.2.7" +__version__ = "0.2.8" version_split = __version__.split(".") __spec_version__ = ( (1000 * int(version_split[0])) diff --git a/finetune/__init__.py b/finetune/__init__.py index 2149d2d..16b9e3e 100644 --- a/finetune/__init__.py +++ b/finetune/__init__.py @@ -15,16 +15,6 @@ # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. -__version__ = "0.2.7" - - -version_split = __version__.split(".") -__spec_version__ = ( - (1000 * int(version_split[0])) - + (10 * int(version_split[1])) - + (1 * int(version_split[2])) -) - from . import dataset from . import graph from . import mining diff --git a/neurons/miner.py b/neurons/miner.py index 2ab2ceb..e253027 100644 --- a/neurons/miner.py +++ b/neurons/miner.py @@ -34,6 +34,7 @@ from finetune.mining import Actions from utilities import utils import datetime as dt +import constants from dotenv import load_dotenv @@ -282,7 +283,7 @@ async def main(config: bt.config): "uid": my_uid, "hotkey": wallet.hotkey.ss58_address, "run_name": run_id, - "version": ft.__version__, + "version": constants.__version__, "type": "miner", }, allow_val_change=True, diff --git a/neurons/validator.py b/neurons/validator.py index 2820714..83dc43e 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -50,6 +50,7 @@ from utilities import utils from utilities.perf_monitor import PerfMonitor from transformers import AutoTokenizer, GenerationConfig +import constants os.environ["TOKENIZERS_PARALLELISM"] = "true" @@ -329,7 +330,7 @@ def new_wandb_run(self): "uid": self.uid, "hotkey": self.wallet.hotkey.ss58_address, "run_name": run_id, - "version": ft.__version__, + "version": constants.__version__, "type": "validator", }, allow_val_change=True, From 76e7bd10a718072f46a71cbe60c0b05ef5167fef Mon Sep 17 00:00:00 2001 From: Pyython Date: Sat, 18 May 2024 14:34:16 -0400 Subject: [PATCH 09/11] refactor to improve error handling and legibility --- constants/__init__.py | 187 +++++++++++++++++++++ finetune/dataset.py | 378 +++++++++++++++--------------------------- 2 files changed, 320 insertions(+), 245 deletions(-) diff --git a/constants/__init__.py b/constants/__init__.py index be026e9..e087b3d 100644 --- a/constants/__init__.py +++ b/constants/__init__.py @@ -72,6 +72,193 @@ class CompetitionParameters: assert math.isclose(sum(x.reward_percentage for x in COMPETITION_SCHEDULE), 1.0) assert all(len(x.competition_id) > 0 and len(x.competition_id) <= 2 for x in COMPETITION_SCHEDULE) +UNWANTED_PHRASES = [ + "text-based AI language model", + "please refrain", + "it is never okay", + "It is important to", + "It's important to", + "real-world consequences", + "responsible AI", + "AI principles", + "AI assistant", + "an AI language", + "as a language model", + "as an AI language model", + "As a large language model", + "As an AI", + "ethical principles", + "it is not appropriate", + "it's not appropriate", + "I cannot fulfill your request", + "ethical guidelines", + "my guidelines", + "prioritize user safety", + "cannot provide guidance", + "cannot provide information", + "unable to offer assistance", + "cannot engage in discussions", + "programming prohibits", + "follow ethical guidelines", + "cannot support or promote", + "against my programming", + "not able to provide", + "cannot provide any information", + "an AI language model you don't have", + "As an AI language model, I cannot", + "As an AI language model, I do not", + "As an AI language model, I am not able", + "As an AI language model, I don't have personal", + "I am an AI language model and do not", + "However, it is important to use any code or information provided responsibly and within legal and ethical boundaries.", + "As an AI language model, I don't have", + "As an AI language model, I am only able", + "AI language model and I do not", + "As an AI language model, I cannot modify", + "As an AI language model, I do not", + "I know as an AI language model you don't have", + "as an AI language model, you cannot", + "I'm sorry, but as an AI language model", + "As an AI language model, I don't have", + "Unfortunately, I cannot provide", + "I'm sorry, I cannot", + "I'm sorry, I cannot generate", + "AI cannot create or program", + "I'm afraid I cannot create", + "I cannot assist", + "I'm sorry,", + "I'm an AI" , + "I am an AI", + "my purpose", + "entertainment purposes", + "purely hypothetical", + "not a human", + "I am an AI", + "cannot provide", + "can't provide", + "won't provide", + "not provide", + "a language model", + "As a machine", + "I don't have the ability", + "I am here to assist", + "my purpose is to ", + "my knowledge cutoff", + "my knowledge cut off", + "September 2021", + "I apologize, but", + "It is not possible", + "Please note", + "not acceptable", + "*This chat conversation is shared from", + "*This conversation is shared from", + "<|endoftext|>", + "Я разработчик", + "I'm sorry, I cannot", + "breach of", + "privacy policy", + "I am programmed to", + "As a helpful assistant", + "I don't have beliefs", + "I don't have personal", + "I don't have a personal", + "I don't have emotions", + "I don't have the ability to feel", + "I don't have a physical", + "I don't have physical", + "I don't have the ability to remember", + "I don't have access to real-time", + "I don't have sensors or a physical body", + "I don't have sensory input", + "I don't have a sense", + "I don't have the capability to perceive", + "I don't have the capability to feel", + "I am an artificial intelligence", + "I don't have access to real-time", + "I don't have beliefs or disagreements", + "I do not have a sense of", + "I do not have beliefs", + "I do not have personal", + "I do not have a personal", + "I do not have emotions", + "I do not have the ability to feel", + "I do not have a physical", + "I do not have physical", + "I do not have the ability to remember", + "I do not have access to real-time", + "I do not have sensors or a physical body", + "I do not have sensory input", + "I do not have a sense", + "I do not have the capability to perceive", + "I do not have the capability to feel", + "I am an artificial intelligence", + "I do not have access to real-time", + "I do not have beliefs or disagreements", + "I do not have a sense of", + "September 2021", + "as a language model", + "ethical guidelines", + "as an AI language model", + "my guidelines", + "As an AI", + "cannot provide guidance", + "cannot provide information", + "unable to offer assistance", + "cannot engage in discussions", + "programming prohibits", + "cannot support or promote", + "activities that could harm", + "against my programming", + "activities that could undermine", + "not within the scope", + "designed to prioritize safety", + "not able to provide", + "maintain user safety", + "adhere to safety guidelines", + "dangerous or harmful", + "cannot provide any information", + "focus on promoting safety", + "maintain user safety", + "focus on promoting safety", + "it is never okay", + "September 2021", + "as a language model", + "ethical guidelines", + "as an AI language model", + "my guidelines", + "As an AI", + "prioritize user safety", + "adhere to ethical guidelines", + "promote safety", + "responsible information sharing", + "jeopardize the safety", + "safe information", + "cannot provide guidance", + "cannot provide information", + "unable to offer assistance", + "cannot engage in discussions", + "programming prohibits", + "prioritize safety", + "cannot support or promote", + "activities that could harm", + "against my programming", + "potentially dangerous", + "not within the scope", + "not able to provide", + "cannot provide any information", + "I don't have beliefs" + "I don't have personal" + "gpt", + "gpT", + "gPt", + "Gpt", + "gPT", + "GpT", + "GPt", + "GPT", + "gpt" +] + # --------------------------------- # Miner/Validator Model parameters. # --------------------------------- diff --git a/finetune/dataset.py b/finetune/dataset.py index 7f6d613..ed97975 100644 --- a/finetune/dataset.py +++ b/finetune/dataset.py @@ -28,194 +28,24 @@ import numpy as np from tqdm import tqdm -UNWANTED_PHRASES = [ - "text-based AI language model", - "please refrain", - "it is never okay", - "It is important to", - "It's important to", - "real-world consequences", - "responsible AI", - "AI principles", - "AI assistant", - "an AI language", - "as a language model", - "as an AI language model", - "As a large language model", - "As an AI", - "ethical principles", - "it is not appropriate", - "it's not appropriate", - "I cannot fulfill your request", - "ethical guidelines", - "my guidelines", - "prioritize user safety", - "cannot provide guidance", - "cannot provide information", - "unable to offer assistance", - "cannot engage in discussions", - "programming prohibits", - "follow ethical guidelines", - "cannot support or promote", - "against my programming", - "not able to provide", - "cannot provide any information", - "an AI language model you don't have", - "As an AI language model, I cannot", - "As an AI language model, I do not", - "As an AI language model, I am not able", - "As an AI language model, I don't have personal", - "I am an AI language model and do not", - "However, it is important to use any code or information provided responsibly and within legal and ethical boundaries.", - "As an AI language model, I don't have", - "As an AI language model, I am only able", - "AI language model and I do not", - "As an AI language model, I cannot modify", - "As an AI language model, I do not", - "I know as an AI language model you don't have", - "as an AI language model, you cannot", - "I'm sorry, but as an AI language model", - "As an AI language model, I don't have", - "Unfortunately, I cannot provide", - "I'm sorry, I cannot", - "I'm sorry, I cannot generate", - "AI cannot create or program", - "I'm afraid I cannot create", - "I cannot assist", - "I'm sorry,", - "I'm an AI" , - "I am an AI", - "my purpose", - "entertainment purposes", - "purely hypothetical", - "not a human", - "I am an AI", - "cannot provide", - "can't provide", - "won't provide", - "not provide", - "a language model", - "As a machine", - "I don't have the ability", - "I am here to assist", - "my purpose is to ", - "my knowledge cutoff", - "my knowledge cut off", - "September 2021", - "I apologize, but", - "It is not possible", - "Please note", - "not acceptable", - "*This chat conversation is shared from", - "*This conversation is shared from", - "<|endoftext|>", - "Я разработчик", - "I'm sorry, I cannot", - "breach of", - "privacy policy", - "I am programmed to", - "As a helpful assistant", - "I don't have beliefs", - "I don't have personal", - "I don't have a personal", - "I don't have emotions", - "I don't have the ability to feel", - "I don't have a physical", - "I don't have physical", - "I don't have the ability to remember", - "I don't have access to real-time", - "I don't have sensors or a physical body", - "I don't have sensory input", - "I don't have a sense", - "I don't have the capability to perceive", - "I don't have the capability to feel", - "I am an artificial intelligence", - "I don't have access to real-time", - "I don't have beliefs or disagreements", - "I do not have a sense of", - "I do not have beliefs", - "I do not have personal", - "I do not have a personal", - "I do not have emotions", - "I do not have the ability to feel", - "I do not have a physical", - "I do not have physical", - "I do not have the ability to remember", - "I do not have access to real-time", - "I do not have sensors or a physical body", - "I do not have sensory input", - "I do not have a sense", - "I do not have the capability to perceive", - "I do not have the capability to feel", - "I am an artificial intelligence", - "I do not have access to real-time", - "I do not have beliefs or disagreements", - "I do not have a sense of", - "September 2021", - "as a language model", - "ethical guidelines", - "as an AI language model", - "my guidelines", - "As an AI", - "cannot provide guidance", - "cannot provide information", - "unable to offer assistance", - "cannot engage in discussions", - "programming prohibits", - "cannot support or promote", - "activities that could harm", - "against my programming", - "activities that could undermine", - "not within the scope", - "designed to prioritize safety", - "not able to provide", - "maintain user safety", - "adhere to safety guidelines", - "dangerous or harmful", - "cannot provide any information", - "focus on promoting safety", - "maintain user safety", - "focus on promoting safety", - "it is never okay", - "September 2021", - "as a language model", - "ethical guidelines", - "as an AI language model", - "my guidelines", - "As an AI", - "prioritize user safety", - "adhere to ethical guidelines", - "promote safety", - "responsible information sharing", - "jeopardize the safety", - "safe information", - "cannot provide guidance", - "cannot provide information", - "unable to offer assistance", - "cannot engage in discussions", - "programming prohibits", - "prioritize safety", - "cannot support or promote", - "activities that could harm", - "against my programming", - "potentially dangerous", - "not within the scope", - "not able to provide", - "cannot provide any information", - "I don't have beliefs" - "I don't have personal" - "gpt", - "gpT", - "gPt", - "Gpt", - "gPT", - "GpT", - "GPt", - "GPT", - "gpt" -] - class CortexSubsetLoader(IterableDataset): + """ + A dataset loader for fetching subsets of data from WandB Cortex project runs. + + Args: + subtensor (bt.subtensor, optional): Bittensor subtensor instance. + latest (bool): Whether to fetch the latest data. + random_seed (int, optional): Random seed for shuffling. + max_samples (int): Maximum number of samples to fetch. + steps (int, optional): Number of steps to fetch. + progress (bool): Whether to display progress bars. + retry_limit (int): Number of retry attempts for fetching data. + page_size (int): Page size for fetching data from WandB. + running (bool, optional): Whether to fetch only running runs. + cortex_project (str): Name of the WandB Cortex project. + cortex_type (str): Type of the Cortex project. + """ + def __init__( self, subtensor: typing.Optional[bt.subtensor] = None, @@ -264,85 +94,143 @@ def __init__( self.fetch_data() def fetch_data(self): + """ + Fetches data from the WandB Cortex project runs with retry logic. + """ attempt = 0 - while attempt < self.retry_limit: try: - for run_index in tqdm(self.run_order, desc="Run", leave=False, disable=not self.progress): - run = self.runs[run_index] - if run.config: - id = run.id - hotkey = run.config.get("hotkey") - signature = run.config.get("signature") - if id and hotkey and signature: - keypair = Keypair(ss58_address=hotkey) - verified = keypair.verify(id.encode(), bytes.fromhex(signature)) - if verified and self.subtensor is not None: - stake = self.subtensor.get_total_stake_for_hotkey(hotkey) - stake_int = int(stake) - if stake_int > 25000000000000: - self.selected_runs.append(run_index) - - last_step = self.last_steps[run_index] - max_step = last_step + 1 - min_step = max(0, max_step - self.steps) if self.steps is not None else 0 - history_scan = HistoryScan(run.client, run, min_step, max_step, page_size=self.page_size) - while True: - try: - sample = next(history_scan) - if sample and sample.get("modality") == "text": - for uid in range(constants.CORTEX_MAX_UIDS): - try: - prompt: typing.Optional[str] = sample[f"prompts.{uid}"] - response: typing.Optional[str] = sample[f"responses.{uid}"] - score: typing.Optional[float] = sample[f"scores.{uid}"] - if isinstance(prompt, str) and isinstance(response, str): - prompt = prompt.strip() - response = response.strip() - if len(prompt) > 0 and len(response) > 0: - if not any(x in response for x in UNWANTED_PHRASES): - if score and isinstance(score, float): - if score > 0.0: - self.buffer.append((prompt, response)) - else: - self.buffer.append((prompt, response)) - if len(self.buffer) == self.max_samples: - return - except KeyError: - pass - except StopIteration: - break - bt.logging.warning(f"Did not collect {self.max_samples}, only got {len(self.buffer)}") + self.process_runs() + if len(self.buffer) < self.max_samples: + bt.logging.warning(f"Did not collect {self.max_samples}, only got {len(self.buffer)}") return - except: + except Exception as e: attempt += 1 bt.logging.warning( - f"Failed to fetch data, retrying. Attempt {attempt}/{self.retry_limit}" + f"Failed to fetch data: {e}. Retrying in {self.retry_delay} seconds. Attempt {attempt}/{self.retry_limit}" ) if attempt < self.retry_limit: time.sleep(self.retry_delay) # Wait before the next retry else: bt.logging.error( - "Maximum retry limit reached. Unable to fetch data." + f"Maximum retry limit reached. Unable to fetch data. Error: {e}" ) raise + def process_runs(self): + """ + Processes each run to verify that it belongs to a validator w/ minimum stake and then collect samples from the run. + """ + for run_index in tqdm(self.run_order, desc="Run", leave=False, disable=not self.progress): + run = self.runs[run_index] + if run.config: + self.verify_and_select_run(run, run_index) + self.collect_samples(run, run_index) + if len(self.buffer) >= self.max_samples: + return + + def verify_and_select_run(self, run, run_index): + """ + Verifies the run by checking that it is signed by an active Bittensor validator that meets minimum stake criteria. + + Args: + run: The WandB run to verify. + run_index: The index of the run in the list of runs. + """ + try: + id = run.id + hotkey = run.config.get("hotkey") + signature = run.config.get("signature") + if id and hotkey and signature: + keypair = Keypair(ss58_address=hotkey) + verified = keypair.verify(id.encode(), bytes.fromhex(signature)) + if verified and self.subtensor is not None: + stake = self.subtensor.get_total_stake_for_hotkey(hotkey) + stake_int = int(stake) + if stake_int > 25000000000000: + self.selected_runs.append(run_index) + except Exception as e: + bt.logging.warning(f"Error verifying run {run.id}: {e}") + + def collect_samples(self, run, run_index): + """ + Collects samples from the verified runs. + + Args: + run: The WandB run to collect samples from. + run_index: The index of the run in the list of runs. + """ + last_step = self.last_steps[run_index] + max_step = last_step + 1 + min_step = max(0, max_step - self.steps) if self.steps is not None else 0 + history_scan = HistoryScan(run.client, run, min_step, max_step, page_size=self.page_size) + while True: + try: + sample = next(history_scan) + if sample and sample.get("modality") == "text": + self.extract_and_store_prompts(sample) + if len(self.buffer) >= self.max_samples: + return + except StopIteration: + break + except Exception as e: + bt.logging.warning(f"Error collecting samples from run {run.id}: {e}") + break + + def extract_and_store_prompts(self, sample): + """ + Extracts and stores prompts and responses from the sample. + + Args: + sample: The sample to extract prompts and responses from. + """ + for uid in range(constants.CORTEX_MAX_UIDS): + prompt: typing.Optional[str] = sample.get(f"prompts.{uid}") + response: typing.Optional[str] = sample.get(f"responses.{uid}") + score: typing.Optional[float] = sample.get(f"scores.{uid}") + if prompt and response: + prompt = prompt.strip() + response = response.strip() + if len(prompt) > 0 and len(response) > 0 and not any(x in response for x in constants.UNWANTED_PHRASES): + if score and isinstance(score, float) and score > 0.0: + self.buffer.append((prompt, response)) + elif not score: + self.buffer.append((prompt, response)) + def tokenize(self, tokenizer: PreTrainedTokenizerBase) -> typing.List[typing.Tuple[torch.Tensor, int]]: + """ + Tokenizes the collected prompts and responses. + + Args: + tokenizer (PreTrainedTokenizerBase): The tokenizer to use for tokenizing the data. + + Returns: + List[Tuple[torch.Tensor, int]]: A list of tokenized prompts and responses. + """ batches = [] for prompt, response in self: conversation = [ {"role": "user", "content": prompt}, {"role": "assistant", "content": response} ] - prompt_ids = tokenizer.apply_chat_template( - [conversation[0]], truncation=True, max_length=constants.sequence_length, - add_generation_prompt=True - ) - ids = tokenizer.apply_chat_template( - conversation, truncation=True, max_length=constants.sequence_length, - ) - batches.append((torch.stack([torch.tensor(ids)]), len(prompt_ids))) + try: + prompt_ids = tokenizer.apply_chat_template( + [conversation[0]], truncation=True, max_length=constants.sequence_length, + add_generation_prompt=True + ) + ids = tokenizer.apply_chat_template( + conversation, truncation=True, max_length=constants.sequence_length, + ) + batches.append((torch.stack([torch.tensor(ids)]), len(prompt_ids))) + except Exception as e: + bt.logging.warning(f"Error tokenizing conversation: {e}") return batches def __iter__(self): - return self.buffer.__iter__() + """ + Returns an iterator over the buffer containing collected prompts and responses. + + Returns: + Iterator: An iterator over the buffer. + """ + return iter(self.buffer) From 269b007dccb234dc3ee718aa2f0087b6ea5de709 Mon Sep 17 00:00:00 2001 From: Pyython Date: Sat, 18 May 2024 20:46:38 -0400 Subject: [PATCH 10/11] update gitignore --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 33ec8f2..5c0ee20 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ # VS Code .vscode/ +.DS_Store + test-models/ # Exclude the Miner's and Validator's directory for saving the models. From f79820dfef2360aa6196c7c5059007594e50901a Mon Sep 17 00:00:00 2001 From: chpiatt Date: Sun, 19 May 2024 15:27:11 -0400 Subject: [PATCH 11/11] cleanup --- finetune/dataset.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/finetune/dataset.py b/finetune/dataset.py index ed97975..95206f7 100644 --- a/finetune/dataset.py +++ b/finetune/dataset.py @@ -63,10 +63,6 @@ def __init__( self.api = wandb.Api(timeout=100) self.filters = [{"config.type": cortex_type}] self.subtensor = subtensor - - if running: - self.filters.append({"state": "running"}) - self.runs = self.api.runs(cortex_project, filters={"$and": self.filters}) self.retry_delay = 5 # Seconds to wait between retries self.max_samples = max_samples self.steps = steps @@ -74,9 +70,17 @@ def __init__( self.retry_limit = retry_limit self.page_size = page_size self.latest = latest - self.generator = np.random.default_rng(seed=random_seed) if random_seed is not None else None + + if running: + self.filters.append({"state": "running"}) + + self.runs = self.api.runs(cortex_project, filters={"$and": self.filters}) + if not self.runs: + raise ValueError("No runs found for the specified Cortex project and filters.") self.run_order = list(range(len(self.runs))) + + self.generator = np.random.default_rng(seed=random_seed) if random_seed is not None else None if self.generator is not None: self.generator.shuffle(self.run_order) @@ -122,6 +126,8 @@ def process_runs(self): Processes each run to verify that it belongs to a validator w/ minimum stake and then collect samples from the run. """ for run_index in tqdm(self.run_order, desc="Run", leave=False, disable=not self.progress): + if run_index >= len(self.runs): + break run = self.runs[run_index] if run.config: self.verify_and_select_run(run, run_index) @@ -146,9 +152,10 @@ def verify_and_select_run(self, run, run_index): verified = keypair.verify(id.encode(), bytes.fromhex(signature)) if verified and self.subtensor is not None: stake = self.subtensor.get_total_stake_for_hotkey(hotkey) - stake_int = int(stake) - if stake_int > 25000000000000: - self.selected_runs.append(run_index) + if stake is not None: + stake_int = int(stake) + if stake_int > 10000000000000: + self.selected_runs.append(run_index) except Exception as e: bt.logging.warning(f"Error verifying run {run.id}: {e}")