From b315e6885cf70576ecf3b848ea10e30f6fc12662 Mon Sep 17 00:00:00 2001 From: Alexander Sysoev Date: Thu, 24 Aug 2023 13:51:26 +0300 Subject: [PATCH] Revert "Remove validators registation, rewards voting (#151)" (#153) This reverts commit e6175d53ebfa4668afc576b6538589fce72c91a3. --- oracle/keeper/contracts.py | 85 ++++++ oracle/keeper/health_server.py | 1 + oracle/keeper/typings.py | 11 + oracle/keeper/utils.py | 227 +++++++++++++-- oracle/networks.py | 81 ++++++ oracle/oracle/common/clients.py | 24 ++ oracle/oracle/common/eth1.py | 20 +- oracle/oracle/common/graphql_queries.py | 101 +++++++ oracle/oracle/main.py | 41 ++- oracle/oracle/rewards/__init__.py | 0 oracle/oracle/rewards/controller.py | 270 ++++++++++++++++++ oracle/oracle/rewards/eth1.py | 38 +++ oracle/oracle/rewards/eth2.py | 83 ++++++ oracle/oracle/rewards/tests/__init__.py | 0 .../oracle/rewards/tests/test_controller.py | 139 +++++++++ oracle/oracle/rewards/types.py | 27 ++ oracle/oracle/tests/test_clients.py | 4 + oracle/oracle/validators/__init__.py | 0 oracle/oracle/validators/controller.py | 125 ++++++++ oracle/oracle/validators/eth1.py | 128 +++++++++ oracle/oracle/validators/tests/__init__.py | 0 .../validators/tests/test_controller.py | 182 ++++++++++++ oracle/oracle/validators/types.py | 34 +++ oracle/oracle/vote.py | 5 +- oracle/settings.py | 2 + pyproject.toml | 2 +- 26 files changed, 1602 insertions(+), 28 deletions(-) create mode 100644 oracle/oracle/rewards/__init__.py create mode 100644 oracle/oracle/rewards/controller.py create mode 100644 oracle/oracle/rewards/eth1.py create mode 100644 oracle/oracle/rewards/eth2.py create mode 100644 oracle/oracle/rewards/tests/__init__.py create mode 100644 oracle/oracle/rewards/tests/test_controller.py create mode 100644 oracle/oracle/rewards/types.py create mode 100644 oracle/oracle/validators/__init__.py create mode 100644 oracle/oracle/validators/controller.py create mode 100644 oracle/oracle/validators/eth1.py create mode 100644 oracle/oracle/validators/tests/__init__.py create mode 100644 oracle/oracle/validators/tests/test_controller.py create mode 100644 oracle/oracle/validators/types.py diff --git a/oracle/keeper/contracts.py b/oracle/keeper/contracts.py index e136493a..24e886d3 100644 --- a/oracle/keeper/contracts.py +++ b/oracle/keeper/contracts.py @@ -46,6 +46,13 @@ def get_oracles_contract(web3_client: Web3) -> Contract: "stateMutability": "view", "type": "function", }, + { + "inputs": [], + "name": "currentValidatorsNonce", + "outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}], + "stateMutability": "view", + "type": "function", + }, { "inputs": [ {"internalType": "address", "name": "account", "type": "address"} @@ -104,5 +111,83 @@ def get_oracles_contract(web3_client: Web3) -> Contract: "stateMutability": "nonpayable", "type": "function", }, + { + "inputs": [ + { + "internalType": "uint256", + "name": "totalRewards", + "type": "uint256", + }, + { + "internalType": "uint256", + "name": "activatedValidators", + "type": "uint256", + }, + { + "internalType": "bytes[]", + "name": "signatures", + "type": "bytes[]", + }, + ], + "name": "submitRewards", + "outputs": [], + "stateMutability": "nonpayable", + "type": "function", + }, + { + "inputs": [ + { + "components": [ + { + "internalType": "address", + "name": "operator", + "type": "address", + }, + { + "internalType": "bytes32", + "name": "withdrawalCredentials", + "type": "bytes32", + }, + { + "internalType": "bytes32", + "name": "depositDataRoot", + "type": "bytes32", + }, + { + "internalType": "bytes", + "name": "publicKey", + "type": "bytes", + }, + { + "internalType": "bytes", + "name": "signature", + "type": "bytes", + }, + ], + "internalType": "struct IPoolValidators.DepositData[]", + "name": "depositData", + "type": "tuple[]", + }, + { + "internalType": "bytes32[][]", + "name": "merkleProofs", + "type": "bytes32[][]", + }, + { + "internalType": "bytes32", + "name": "validatorsDepositRoot", + "type": "bytes32", + }, + { + "internalType": "bytes[]", + "name": "signatures", + "type": "bytes[]", + }, + ], + "name": "registerValidators", + "outputs": [], + "stateMutability": "nonpayable", + "type": "function", + }, ], ) diff --git a/oracle/keeper/health_server.py b/oracle/keeper/health_server.py index 427dc7b9..f3cb230f 100644 --- a/oracle/keeper/health_server.py +++ b/oracle/keeper/health_server.py @@ -32,6 +32,7 @@ async def health(request): get_oracles_votes( web3_client=web3_client, rewards_nonce=params.rewards_nonce, + validators_nonce=params.validators_nonce, oracles=params.oracles, ) diff --git a/oracle/keeper/typings.py b/oracle/keeper/typings.py index 0520f61f..93150d56 100644 --- a/oracle/keeper/typings.py +++ b/oracle/keeper/typings.py @@ -2,8 +2,19 @@ from eth_typing import ChecksumAddress +from oracle.oracle.distributor.common.types import DistributorVote +from oracle.oracle.rewards.types import RewardVote +from oracle.oracle.validators.types import ValidatorsVote + class Parameters(NamedTuple): rewards_nonce: int + validators_nonce: int paused: bool oracles: List[ChecksumAddress] + + +class OraclesVotes(NamedTuple): + rewards: List[RewardVote] + distributor: List[DistributorVote] + validators: List[ValidatorsVote] diff --git a/oracle/keeper/utils.py b/oracle/keeper/utils.py index 03eeddd8..6271cc73 100644 --- a/oracle/keeper/utils.py +++ b/oracle/keeper/utils.py @@ -1,3 +1,4 @@ +import json import logging import time from collections import Counter @@ -12,13 +13,17 @@ from web3.contract import Contract, ContractFunction from web3.types import TxParams -from oracle.keeper.typings import Parameters +from oracle.keeper.typings import OraclesVotes, Parameters from oracle.oracle.distributor.common.types import DistributorVote +from oracle.oracle.rewards.types import RewardVote +from oracle.oracle.validators.types import ValidatorsVote from oracle.settings import ( CONFIRMATION_BLOCKS, DISTRIBUTOR_VOTE_FILENAME, NETWORK_CONFIG, + REWARD_VOTE_FILENAME, TRANSACTION_TIMEOUT, + VALIDATOR_VOTE_FILENAME, ) logger = logging.getLogger(__name__) @@ -40,6 +45,10 @@ def get_keeper_params( "target": oracles_contract.address, "callData": oracles_contract.encodeABI(fn_name="currentRewardsNonce"), }, + { + "target": oracles_contract.address, + "callData": oracles_contract.encodeABI(fn_name="currentValidatorsNonce"), + }, { "target": oracles_contract.address, "callData": oracles_contract.encodeABI( @@ -51,7 +60,8 @@ def get_keeper_params( paused = bool(Web3.toInt(primitive=response[0])) rewards_nonce = Web3.toInt(primitive=response[1]) - total_oracles = Web3.toInt(primitive=response[2]) + validators_nonce = Web3.toInt(primitive=response[2]) + total_oracles = Web3.toInt(primitive=response[3]) calls = [] for i in range(total_oracles): calls.append( @@ -70,6 +80,7 @@ def get_keeper_params( return Parameters( paused=paused, rewards_nonce=rewards_nonce, + validators_nonce=validators_nonce, oracles=oracles, ) @@ -93,6 +104,26 @@ def validate_vote_signature( return True +def check_reward_vote( + web3_client: Web3, vote: RewardVote, oracle: ChecksumAddress +) -> bool: + """Checks whether oracle's reward vote is correct.""" + try: + encoded_data: bytes = web3_client.codec.encode_abi( + ["uint256", "uint256", "uint256"], + [ + int(vote["nonce"]), + int(vote["activated_validators"]), + int(vote["total_rewards"]), + ], + ) + return validate_vote_signature( + web3_client, encoded_data, oracle, vote["signature"] + ) + except: # noqa: E722 + return False + + def check_distributor_vote( web3_client: Web3, vote: DistributorVote, oracle: ChecksumAddress ) -> bool: @@ -109,36 +140,83 @@ def check_distributor_vote( return False +def check_validator_vote( + web3_client: Web3, vote: ValidatorsVote, oracle: ChecksumAddress +) -> bool: + """Checks whether oracle's validator vote is correct.""" + try: + deposit_data_payloads = [] + for deposit_data in vote["deposit_data"]: + deposit_data_payloads.append( + ( + deposit_data["operator"], + deposit_data["withdrawal_credentials"], + deposit_data["deposit_data_root"], + deposit_data["public_key"], + deposit_data["deposit_data_signature"], + ) + ) + encoded_data: bytes = web3_client.codec.encode_abi( + ["uint256", "(address,bytes32,bytes32,bytes,bytes)[]", "bytes32"], + [ + int(vote["nonce"]), + deposit_data_payloads, + vote["validators_deposit_root"], + ], + ) + return validate_vote_signature( + web3_client, encoded_data, oracle, vote["signature"] + ) + except: # noqa: E722 + return False + + def get_oracles_votes( web3_client: Web3, rewards_nonce: int, + validators_nonce: int, oracles: List[ChecksumAddress], -) -> List[DistributorVote]: +) -> OraclesVotes: """Fetches oracle votes that match current nonces.""" - votes = [] + votes = OraclesVotes(rewards=[], distributor=[], validators=[]) aws_bucket_name = NETWORK_CONFIG["AWS_BUCKET_NAME"] aws_region = NETWORK_CONFIG["AWS_REGION"] for oracle in oracles: - # TODO: support more aggregators (GCP, Azure, etc.) - bucket_key = f"{oracle}/{DISTRIBUTOR_VOTE_FILENAME}" - try: - response = requests.get( - f"https://{aws_bucket_name}.s3.{aws_region}.amazonaws.com/{bucket_key}" - ) - response.raise_for_status() - vote = response.json() - if "nonce" not in vote or vote["nonce"] != rewards_nonce: - continue - if not check_distributor_vote(web3_client, vote, oracle): - logger.warning( - f"Oracle {oracle} has submitted incorrect vote at {bucket_key}" + for arr, filename, correct_nonce, vote_checker in [ + (votes.rewards, REWARD_VOTE_FILENAME, rewards_nonce, check_reward_vote), + ( + votes.distributor, + DISTRIBUTOR_VOTE_FILENAME, + rewards_nonce, + check_distributor_vote, + ), + ( + votes.validators, + VALIDATOR_VOTE_FILENAME, + validators_nonce, + check_validator_vote, + ), + ]: + # TODO: support more aggregators (GCP, Azure, etc.) + bucket_key = f"{oracle}/{filename}" + try: + response = requests.get( + f"https://{aws_bucket_name}.s3.{aws_region}.amazonaws.com/{bucket_key}" ) - continue - - votes.append(vote) - except: # noqa: E722 - pass + response.raise_for_status() + vote = response.json() + if "nonce" not in vote or vote["nonce"] != correct_nonce: + continue + if not vote_checker(web3_client, vote, oracle): + logger.warning( + f"Oracle {oracle} has submitted incorrect vote at {bucket_key}" + ) + continue + + arr.append(vote) + except: # noqa: E722 + pass return votes @@ -205,18 +283,55 @@ def submit_votes( votes = get_oracles_votes( web3_client=web3_client, rewards_nonce=params.rewards_nonce, + validators_nonce=params.validators_nonce, oracles=params.oracles, ) total_oracles = len(params.oracles) - counter = Counter([(vote["merkle_root"], vote["merkle_proofs"]) for vote in votes]) + counter = Counter( + [ + (vote["total_rewards"], vote["activated_validators"]) + for vote in votes.rewards + ] + ) + most_voted = counter.most_common(1) + if most_voted and can_submit(most_voted[0][1], total_oracles): + total_rewards, activated_validators = most_voted[0][0] + signatures: List[HexStr] = [] + i = 0 + while not can_submit(len(signatures), total_oracles): + vote = votes.rewards[i] + if (total_rewards, activated_validators) == ( + vote["total_rewards"], + vote["activated_validators"], + ): + signatures.append(vote["signature"]) + + i += 1 + + logger.info( + f"Submitting total rewards update:" + f' rewards={Web3.fromWei(int(total_rewards), "ether")},' + f" activated validators={activated_validators}" + ) + submit_update( + web3_client, + oracles_contract.functions.submitRewards( + int(total_rewards), int(activated_validators), signatures + ), + ) + logger.info("Total rewards has been successfully updated") + + counter = Counter( + [(vote["merkle_root"], vote["merkle_proofs"]) for vote in votes.distributor] + ) most_voted = counter.most_common(1) if most_voted and can_submit(most_voted[0][1], total_oracles): merkle_root, merkle_proofs = most_voted[0][0] signatures = [] i = 0 while not can_submit(len(signatures), total_oracles): - vote = votes[i] + vote = votes.distributor[i] if (merkle_root, merkle_proofs) == ( vote["merkle_root"], vote["merkle_proofs"], @@ -234,3 +349,67 @@ def submit_votes( ), ) logger.info("Merkle Distributor has been successfully updated") + + counter = Counter( + [ + ( + json.dumps(vote["deposit_data"], sort_keys=True), + vote["validators_deposit_root"], + ) + for vote in votes.validators + ] + ) + most_voted = counter.most_common(1) + if most_voted and can_submit(most_voted[0][1], total_oracles): + deposit_data, validators_deposit_root = most_voted[0][0] + deposit_data = json.loads(deposit_data) + + signatures = [] + i = 0 + while not can_submit(len(signatures), total_oracles): + vote = votes.validators[i] + if (deposit_data, validators_deposit_root) == ( + vote["deposit_data"], + vote["validators_deposit_root"], + ): + signatures.append(vote["signature"]) + i += 1 + + validators_vote: ValidatorsVote = next( + vote + for vote in votes.validators + if (deposit_data, validators_deposit_root) + == ( + vote["deposit_data"], + vote["validators_deposit_root"], + ) + ) + logger.info( + f"Submitting validator(s) registration: " + f"count={len(validators_vote['deposit_data'])}, " + f"deposit root={validators_deposit_root}" + ) + submit_deposit_data = [] + submit_merkle_proofs = [] + for deposit in deposit_data: + submit_deposit_data.append( + dict( + operator=deposit["operator"], + withdrawalCredentials=deposit["withdrawal_credentials"], + depositDataRoot=deposit["deposit_data_root"], + publicKey=deposit["public_key"], + signature=deposit["deposit_data_signature"], + ) + ) + submit_merkle_proofs.append(deposit["proof"]) + + submit_update( + web3_client, + oracles_contract.functions.registerValidators( + submit_deposit_data, + submit_merkle_proofs, + validators_deposit_root, + signatures, + ), + ) + logger.info("Validator(s) registration has been successfully submitted") diff --git a/oracle/networks.py b/oracle/networks.py index 356d5d75..912cac77 100644 --- a/oracle/networks.py +++ b/oracle/networks.py @@ -2,6 +2,7 @@ from decouple import Csv, config from ens.constants import EMPTY_ADDR_HEX +from eth_typing import HexStr from web3 import Web3 MAINNET = "mainnet" @@ -34,6 +35,19 @@ cast=Csv(), ), ETH1_ENDPOINT=config("ETH1_ENDPOINT", default=""), + ETH2_ENDPOINT=config("ETH2_ENDPOINT", default=""), + VALIDATORS_FETCH_CHUNK_SIZE=config( + "VALIDATORS_FETCH_CHUNK_SIZE", + default=100, + cast=int, + ), + VALIDATORS_BATCH_SIZE=config( + "VALIDATORS_BATCH_SIZE", + default=10, + cast=int, + ), + SLOTS_PER_EPOCH=32, + SECONDS_PER_SLOT=12, ORACLES_CONTRACT_ADDRESS=Web3.toChecksumAddress( "0x8a887282E67ff41d36C0b7537eAB035291461AcD" ), @@ -52,6 +66,9 @@ DISTRIBUTOR_FALLBACK_ADDRESS=Web3.toChecksumAddress( "0x144a98cb1CdBb23610501fE6108858D9B7D24934" ), + WITHDRAWAL_CREDENTIALS=HexStr( + "0x0100000000000000000000002296e122c1a20fca3cac3371357bdad3be0df079" + ), ORACLE_PRIVATE_KEY=config("ORACLE_PRIVATE_KEY", default=""), ORACLE_STAKEWISE_OPERATOR=Web3.toChecksumAddress( "0x5fc60576b92c5ce5c341c43e3b2866eb9e0cddd1" @@ -98,6 +115,19 @@ cast=Csv(), ), ETH1_ENDPOINT=config("ETH1_ENDPOINT", default=""), + ETH2_ENDPOINT=config("ETH2_ENDPOINT", default=""), + VALIDATORS_FETCH_CHUNK_SIZE=config( + "VALIDATORS_FETCH_CHUNK_SIZE", + default=100, + cast=int, + ), + VALIDATORS_BATCH_SIZE=config( + "VALIDATORS_BATCH_SIZE", + default=10, + cast=int, + ), + SLOTS_PER_EPOCH=32, + SECONDS_PER_SLOT=12, ORACLES_CONTRACT_ADDRESS=Web3.toChecksumAddress( "0x16c0020fC507C675eA8A3A817416adA3D95c661b" ), @@ -116,6 +146,9 @@ DISTRIBUTOR_FALLBACK_ADDRESS=Web3.toChecksumAddress( "0x6C7692dB59FDC7A659208EEE57C2c876aE54a448" ), + WITHDRAWAL_CREDENTIALS=HexStr( + "0x0100000000000000000000005c631621b897f467dd6a91855a0bc97d77b78dc0" + ), ORACLE_PRIVATE_KEY=config("ORACLE_PRIVATE_KEY", default=""), ORACLE_STAKEWISE_OPERATOR=EMPTY_ADDR_HEX, WITHDRAWALS_GENESIS_EPOCH=194048, @@ -159,6 +192,19 @@ cast=Csv(), ), ETH1_ENDPOINT=config("ETH1_ENDPOINT", default=""), + ETH2_ENDPOINT=config("ETH2_ENDPOINT", default=""), + VALIDATORS_FETCH_CHUNK_SIZE=config( + "VALIDATORS_FETCH_CHUNK_SIZE", + default=100, + cast=int, + ), + VALIDATORS_BATCH_SIZE=config( + "VALIDATORS_BATCH_SIZE", + default=10, + cast=int, + ), + SLOTS_PER_EPOCH=32, + SECONDS_PER_SLOT=12, ORACLES_CONTRACT_ADDRESS=Web3.toChecksumAddress( "0x531b9D9cb268E88D53A87890699bbe31326A6f08" ), @@ -177,6 +223,9 @@ DISTRIBUTOR_FALLBACK_ADDRESS=Web3.toChecksumAddress( "0x1867c96601bc5fE24F685d112314B8F3Fe228D5A" ), + WITHDRAWAL_CREDENTIALS=HexStr( + "0x010000000000000000000000040f15c6b5bfc5f324ecab5864c38d4e1eef4218" + ), ORACLE_PRIVATE_KEY=config("ORACLE_PRIVATE_KEY", default=""), ORACLE_STAKEWISE_OPERATOR=EMPTY_ADDR_HEX, WITHDRAWALS_GENESIS_EPOCH=162304, @@ -217,6 +266,19 @@ cast=Csv(), ), ETH1_ENDPOINT=config("ETH1_ENDPOINT", default=""), + ETH2_ENDPOINT=config("ETH2_ENDPOINT", default=""), + VALIDATORS_FETCH_CHUNK_SIZE=config( + "VALIDATORS_FETCH_CHUNK_SIZE", + default=100, + cast=int, + ), + VALIDATORS_BATCH_SIZE=config( + "VALIDATORS_BATCH_SIZE", + default=10, + cast=int, + ), + SLOTS_PER_EPOCH=32, + SECONDS_PER_SLOT=12, ORACLES_CONTRACT_ADDRESS=Web3.toChecksumAddress( "0x4E9CA30186E829D7712ADFEEE491c0c6C46E1AED" ), @@ -235,6 +297,9 @@ DISTRIBUTOR_FALLBACK_ADDRESS=Web3.toChecksumAddress( "0x66D6c253084d8d51c7CFfDb3C188A0b53D998a3d" ), + WITHDRAWAL_CREDENTIALS=HexStr( + "0x0100000000000000000000006dfc9682e3c3263758ad96e2b2ba9822167f81ee" + ), ORACLE_PRIVATE_KEY=config("ORACLE_PRIVATE_KEY", default=""), ORACLE_STAKEWISE_OPERATOR=EMPTY_ADDR_HEX, WITHDRAWALS_GENESIS_EPOCH=162304, @@ -278,6 +343,19 @@ cast=Csv(), ), ETH1_ENDPOINT=config("ETH1_ENDPOINT", default=""), + ETH2_ENDPOINT=config("ETH2_ENDPOINT", default=""), + VALIDATORS_FETCH_CHUNK_SIZE=config( + "VALIDATORS_FETCH_CHUNK_SIZE", + default=100, + cast=int, + ), + VALIDATORS_BATCH_SIZE=config( + "VALIDATORS_BATCH_SIZE", + default=10, + cast=int, + ), + SLOTS_PER_EPOCH=16, + SECONDS_PER_SLOT=5, ORACLES_CONTRACT_ADDRESS=Web3.toChecksumAddress( "0xa6D123620Ea004cc5158b0ec260E934bd45C78c1" ), @@ -296,6 +374,9 @@ DISTRIBUTOR_FALLBACK_ADDRESS=Web3.toChecksumAddress( "0x8737f638E9af54e89ed9E1234dbC68B115CD169e" ), + WITHDRAWAL_CREDENTIALS=HexStr( + "0x010000000000000000000000fc9b67b6034f6b306ea9bd8ec1baf3efa2490394" + ), ORACLE_PRIVATE_KEY=config("ORACLE_PRIVATE_KEY", default=""), ORACLE_STAKEWISE_OPERATOR=EMPTY_ADDR_HEX, WITHDRAWALS_GENESIS_EPOCH=648704, diff --git a/oracle/oracle/common/clients.py b/oracle/oracle/common/clients.py index 571522a9..d28a57af 100644 --- a/oracle/oracle/common/clients.py +++ b/oracle/oracle/common/clients.py @@ -13,6 +13,7 @@ gql_logger.setLevel(logging.ERROR) logger = logging.getLogger(__name__) + # set default GQL query execution timeout to 45 seconds EXECUTE_TIMEOUT = 45 @@ -64,6 +65,17 @@ async def execute_uniswap_v3_gql_query( ) +async def execute_ethereum_gql_query( + network: str, query: DocumentNode, variables: Dict +) -> Dict: + """Executes GraphQL query.""" + return await execute_gql_query( + subgraph_urls=get_network_config(network)["ETHEREUM_SUBGRAPH_URLS"], + query=query, + variables=variables, + ) + + async def _execute_base_gql_paginated_query( subgraph_urls: str, query: DocumentNode, variables: Dict, paginated_field: str ) -> List: @@ -108,6 +120,18 @@ async def execute_uniswap_v3_paginated_gql_query( ) +async def execute_ethereum_paginated_gql_query( + network: str, query: DocumentNode, variables: Dict, paginated_field: str +) -> List: + """Executes ETH query.""" + return await _execute_base_gql_paginated_query( + subgraph_urls=get_network_config(network)["ETHEREUM_SUBGRAPH_URLS"], + query=query, + variables=variables, + paginated_field=paginated_field, + ) + + @backoff.on_exception(backoff.expo, Exception, max_time=300, logger=gql_logger) async def execute_gql_query( subgraph_urls: str, query: DocumentNode, variables: Dict diff --git a/oracle/oracle/common/eth1.py b/oracle/oracle/common/eth1.py index 2ca72a66..3d9aa7ff 100644 --- a/oracle/oracle/common/eth1.py +++ b/oracle/oracle/common/eth1.py @@ -14,6 +14,8 @@ VOTING_PARAMETERS_QUERY, ) from oracle.oracle.distributor.common.types import DistributorVotingParameters +from oracle.oracle.rewards.types import RewardsVotingParameters +from oracle.oracle.validators.types import ValidatorVotingParameters from oracle.settings import CONFIRMATION_BLOCKS, NETWORK_CONFIG, NETWORKS logger = logging.getLogger(__name__) @@ -25,7 +27,9 @@ class Block(TypedDict): class VotingParameters(TypedDict): + rewards: RewardsVotingParameters distributor: DistributorVotingParameters + validator: ValidatorVotingParameters def get_web3_client() -> Web3: @@ -130,6 +134,12 @@ async def get_voting_parameters( "merkleProofs": None, } + rewards = RewardsVotingParameters( + rewards_nonce=int(network["oraclesRewardsNonce"]), + total_rewards=Wei(int(reward_token["totalRewards"])), + total_fees=Wei(int(reward_token["totalFees"])), + rewards_updated_at_timestamp=Timestamp(int(reward_token["updatedAtTimestamp"])), + ) distributor = DistributorVotingParameters( rewards_nonce=int(network["oraclesRewardsNonce"]), from_block=BlockNumber(int(distributor["rewardsUpdatedAtBlock"])), @@ -140,8 +150,16 @@ async def get_voting_parameters( protocol_reward=Wei(int(reward_token["protocolPeriodReward"])), distributor_reward=Wei(int(reward_token["distributorPeriodReward"])), ) + network = result["networks"][0] + pool = result["pools"][0] + validator = ValidatorVotingParameters( + validators_nonce=int(network["oraclesValidatorsNonce"]), + pool_balance=Wei(int(pool["balance"])), + ) - return VotingParameters(distributor=distributor) + return VotingParameters( + rewards=rewards, distributor=distributor, validator=validator + ) def _find_max_consensus(items, func): diff --git a/oracle/oracle/common/graphql_queries.py b/oracle/oracle/common/graphql_queries.py index 7c13e11a..428fc687 100644 --- a/oracle/oracle/common/graphql_queries.py +++ b/oracle/oracle/common/graphql_queries.py @@ -44,9 +44,31 @@ rewardsUpdatedAtBlock } rewardEthTokens(block: { number: $block_number }) { + totalRewards + totalFees distributorPeriodReward protocolPeriodReward updatedAtBlock + updatedAtTimestamp + } + networks(block: { number: $block_number }) { + oraclesValidatorsNonce + } + pools(block: { number: $block_number }) { + balance + } + } +""" +) + +VALIDATOR_VOTING_PARAMETERS_QUERY = gql( + """ + query getVotingParameters($block_number: Int) { + networks(block: { number: $block_number }) { + oraclesValidatorsNonce + } + pools(block: { number: $block_number }) { + balance } } """ @@ -64,6 +86,22 @@ """ ) +REGISTERED_VALIDATORS_QUERY = gql( + """ + query getValidators($block_number: Int, $last_id: ID) { + validators( + block: { number: $block_number } + where: { id_gt: $last_id } + first: 1000 + orderBy: id + orderDirection: asc + ) { + id + } + } +""" +) + ORACLE_QUERY = gql( """ query getOracles($oracle_address: ID) { @@ -356,6 +394,41 @@ """ ) +OPERATORS_QUERY = gql( + """ + query getOperators($block_number: Int) { + operators( + block: { number: $block_number } + where: { committed: true } + orderBy: id + orderDirection: asc + ) { + id + depositDataMerkleProofs + depositDataIndex + } + } +""" +) + +LAST_VALIDATORS_QUERY = gql( + """ + query getValidators($block_number: Int) { + validators( + block: { number: $block_number } + orderBy: createdAtBlock + orderDirection: desc + first: 1 + ) { + operator { + id + } + } + } +""" +) + + PARTNERS_QUERY = gql( """ query getPartners($block_number: Int) { @@ -369,3 +442,31 @@ } """ ) + +VALIDATOR_REGISTRATIONS_QUERY = gql( + """ + query getValidatorRegistrations($block_number: Int, $public_key: Bytes) { + validatorRegistrations( + block: { number: $block_number } + where: { publicKey: $public_key } + ) { + publicKey + } + } +""" +) + +VALIDATOR_REGISTRATIONS_LATEST_INDEX_QUERY = gql( + """ + query getValidatorRegistrations($block_number: Int) { + validatorRegistrations( + block: { number: $block_number } + first: 1 + orderBy: createdAtBlock + orderDirection: desc + ) { + validatorsDepositRoot + } + } +""" +) diff --git a/oracle/oracle/main.py b/oracle/oracle/main.py index c9a8e2e3..d7b18125 100644 --- a/oracle/oracle/main.py +++ b/oracle/oracle/main.py @@ -17,6 +17,9 @@ ) from oracle.oracle.distributor.controller import DistributorController from oracle.oracle.health_server import oracle_routes +from oracle.oracle.rewards.controller import RewardsController +from oracle.oracle.rewards.eth2 import get_finality_checkpoints, get_genesis +from oracle.oracle.validators.controller import ValidatorsController from oracle.oracle.vote import submit_vote from oracle.settings import ( ENABLE_HEALTH_SERVER, @@ -51,11 +54,22 @@ async def main() -> None: # wait for interrupt interrupt_handler = InterruptHandler() + # fetch ETH2 genesis + genesis = await get_genesis(session) + + rewards_controller = RewardsController( + aiohttp_session=session, + genesis_timestamp=int(genesis["genesis_time"]), + oracle=oracle_account, + ) distributor_controller = DistributorController(oracle_account) + validators_controller = ValidatorsController(oracle_account) await process_network( interrupt_handler, + rewards_controller, distributor_controller, + validators_controller, ) await session.close() @@ -80,6 +94,14 @@ async def init_checks(oracle_account, session): ] logger.info(f"Connected to graph nodes at {parsed_uris}") + # check ETH2 API connection + logger.info("Checking connection to ETH2 node...") + await get_finality_checkpoints(session) + parsed_uri = "{uri.scheme}://{uri.netloc}".format( + uri=urlparse(NETWORK_CONFIG["ETH2_ENDPOINT"]) + ) + logger.info(f"Connected to ETH2 node at {parsed_uri}") + # check ETH1 connection logger.info("Checking connection to ETH1 node...") block_number = get_web3_client().eth.block_number @@ -93,13 +115,16 @@ async def init_checks(oracle_account, session): async def process_network( interrupt_handler: InterruptHandler, + rewards_ctrl: RewardsController, distributor_ctrl: DistributorController, + validators_ctrl: ValidatorsController, ) -> None: while not interrupt_handler.exit: try: # fetch current finalized ETH1 block data finalized_block = await get_finalized_block(NETWORK) current_block_number = finalized_block["block_number"] + current_timestamp = finalized_block["timestamp"] latest_block_number = await get_latest_block_number(NETWORK) graphs_synced = await has_synced_block(NETWORK, latest_block_number) @@ -110,7 +135,21 @@ async def process_network( NETWORK, current_block_number ) - await distributor_ctrl.process(voting_parameters["distributor"]) + await asyncio.gather( + # check and update staking rewards + rewards_ctrl.process( + voting_params=voting_parameters["rewards"], + current_block_number=current_block_number, + current_timestamp=current_timestamp, + ), + # check and update merkle distributor + distributor_ctrl.process(voting_parameters["distributor"]), + # process validators registration + validators_ctrl.process( + voting_params=voting_parameters["validator"], + block_number=latest_block_number, + ), + ) except BaseException as e: logger.exception(e) finally: diff --git a/oracle/oracle/rewards/__init__.py b/oracle/oracle/rewards/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/oracle/oracle/rewards/controller.py b/oracle/oracle/rewards/controller.py new file mode 100644 index 00000000..ba73d780 --- /dev/null +++ b/oracle/oracle/rewards/controller.py @@ -0,0 +1,270 @@ +import asyncio +import concurrent.futures +import logging +from concurrent.futures import as_completed +from datetime import datetime, timezone +from typing import Union + +from aiohttp import ClientSession +from eth_account.signers.local import LocalAccount +from eth_typing import BlockNumber, HexStr +from web3 import Web3 +from web3.types import Timestamp, Wei + +from oracle.networks import GNOSIS_CHAIN +from oracle.oracle.common.eth1 import get_web3_client +from oracle.oracle.rewards.eth1 import get_withdrawals +from oracle.oracle.rewards.types import ( + RegisteredValidatorsPublicKeys, + RewardsVotingParameters, + RewardVote, +) +from oracle.oracle.utils import save +from oracle.oracle.vote import submit_vote +from oracle.settings import ( + MGNO_RATE, + NETWORK, + NETWORK_CONFIG, + REWARD_VOTE_FILENAME, + WAD, +) + +from .eth1 import get_registered_validators_public_keys +from .eth2 import ( + PENDING_STATUSES, + ValidatorStatus, + get_execution_block, + get_finality_checkpoints, + get_validators, +) + +logger = logging.getLogger(__name__) +w3 = Web3() + + +class RewardsController(object): + """Updates total rewards and activated validators number.""" + + def __init__( + self, + aiohttp_session: ClientSession, + genesis_timestamp: int, + oracle: LocalAccount, + ) -> None: + self.deposit_amount: Wei = Web3.toWei(32, "ether") + self.aiohttp_session = aiohttp_session + self.genesis_timestamp = genesis_timestamp + self.oracle = oracle + self.sync_period = NETWORK_CONFIG["SYNC_PERIOD"] + self.slots_per_epoch = NETWORK_CONFIG["SLOTS_PER_EPOCH"] + self.seconds_per_epoch = ( + self.slots_per_epoch * NETWORK_CONFIG["SECONDS_PER_SLOT"] + ) + self.deposit_token_symbol = NETWORK_CONFIG["DEPOSIT_TOKEN_SYMBOL"] + self.withdrawals_genesis_epoch = NETWORK_CONFIG["WITHDRAWALS_GENESIS_EPOCH"] + self.last_vote_total_rewards = None + + @save + async def process( + self, + voting_params: RewardsVotingParameters, + current_block_number: BlockNumber, + current_timestamp: Timestamp, + ) -> None: + """Submits vote for the new total rewards and activated validators to the IPFS.""" + # check whether it's voting time + last_update_time = datetime.utcfromtimestamp( + voting_params["rewards_updated_at_timestamp"] + ) + next_update_time: datetime = last_update_time + self.sync_period + current_time: datetime = datetime.utcfromtimestamp(current_timestamp) + while next_update_time + self.sync_period <= current_time: + next_update_time += self.sync_period + + # skip submitting vote if too early or vote has been already submitted + if next_update_time > current_time: + return + + # fetch pool validator BLS public keys + public_keys = await get_registered_validators_public_keys(current_block_number) + + # calculate current ETH2 epoch + update_timestamp = int( + next_update_time.replace(tzinfo=timezone.utc).timestamp() + ) + update_epoch: int = ( + update_timestamp - self.genesis_timestamp + ) // self.seconds_per_epoch + + logger.info( + f"Voting for new total rewards with parameters:" + f" timestamp={update_timestamp}, epoch={update_epoch}" + ) + + # wait for the epoch to get finalized + checkpoints = await get_finality_checkpoints(self.aiohttp_session) + while update_epoch > int(checkpoints["finalized"]["epoch"]): + logger.info(f"Waiting for the epoch {update_epoch} to finalize...") + await asyncio.sleep(360) + checkpoints = await get_finality_checkpoints(self.aiohttp_session) + + state_id = str(update_epoch * self.slots_per_epoch) + total_rewards: Wei = voting_params["total_fees"] + validator_indexes, balance_rewards = await self.calculate_balance_rewards( + public_keys, state_id + ) + total_rewards += balance_rewards + activated_validators = len(validator_indexes) + + withdrawals_rewards = Wei(0) + if ( + self.withdrawals_genesis_epoch + and update_epoch >= self.withdrawals_genesis_epoch + ): + withdrawals_rewards = await self.calculate_withdrawal_rewards( + validator_indexes=validator_indexes, + current_slot=int(state_id), + ) + total_rewards += withdrawals_rewards + + pretty_total_rewards = self.format_ether(total_rewards) + logger.info( + f"Retrieved pool validator rewards:" + f" total={pretty_total_rewards}," + f" balance_rewards={self.format_ether(balance_rewards)}," + f" withdrawals_rewards={self.format_ether(withdrawals_rewards)}," + f" fees={self.format_ether(voting_params['total_fees'])}" + ) + if not total_rewards: + logger.info("No staking rewards, waiting for validators to be activated...") + return + + if total_rewards < voting_params["total_rewards"]: + # rewards were reduced -> don't mint new ones + logger.warning( + f"Total rewards decreased since the previous update:" + f" current={pretty_total_rewards}," + f' previous={self.format_ether(voting_params["total_rewards"])}' + ) + total_rewards = voting_params["total_rewards"] + pretty_total_rewards = self.format_ether(total_rewards) + + # submit vote + logger.info( + f"Submitting rewards vote:" + f" nonce={voting_params['rewards_nonce']}," + f" total rewards={pretty_total_rewards}," + f" activated validators={activated_validators}" + ) + + current_nonce = voting_params["rewards_nonce"] + encoded_data: bytes = w3.codec.encode_abi( + ["uint256", "uint256", "uint256"], + [current_nonce, activated_validators, total_rewards], + ) + vote = RewardVote( + signature=HexStr(""), + nonce=current_nonce, + activated_validators=activated_validators, + total_rewards=str(total_rewards), + ) + submit_vote( + oracle=self.oracle, + encoded_data=encoded_data, + vote=vote, + name=REWARD_VOTE_FILENAME, + ) + logger.info("Rewards vote has been successfully submitted") + + self.last_vote_total_rewards = total_rewards + + async def calculate_balance_rewards( + self, public_keys: RegisteredValidatorsPublicKeys, state_id: str + ) -> tuple[set[int], Wei]: + validator_indexes = set() + rewards = 0 + chunk_size = NETWORK_CONFIG["VALIDATORS_FETCH_CHUNK_SIZE"] + # fetch balances in chunks + for i in range(0, len(public_keys), chunk_size): + validators = await get_validators( + session=self.aiohttp_session, + public_keys=public_keys[i : i + chunk_size], + state_id=state_id, + ) + for validator in validators: + if ValidatorStatus(validator["status"]) in PENDING_STATUSES: + continue + + validator_indexes.add(int(validator["index"])) + validator_reward = ( + Web3.toWei(validator["balance"], "gwei") - self.deposit_amount + ) + if NETWORK == GNOSIS_CHAIN: + # apply mGNO <-> GNO exchange rate + validator_reward = Wei(int(validator_reward * WAD // MGNO_RATE)) + rewards += validator_reward + + return validator_indexes, Wei(rewards) + + async def calculate_withdrawal_rewards( + self, validator_indexes: set[int], current_slot: int + ) -> Wei: + withdrawals_amount = 0 + from_block = await self.get_withdrawals_from_block(current_slot) + to_block = await self.get_withdrawals_to_block(current_slot) + if not from_block or from_block >= to_block: + return Wei(0) + + logger.info( + f"Retrieving pool validator withdrawals " + f"from block: {from_block} to block: {to_block}" + ) + execution_client = get_web3_client() + + with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: + futures = [ + executor.submit(get_withdrawals, execution_client, block_number) + for block_number in range(from_block, to_block) + ] + for future in as_completed(futures): + withdrawals = future.result() + for withdrawal in withdrawals: + if withdrawal["validator_index"] in validator_indexes: + withdrawals_amount += withdrawal["amount"] + + withdrawals_amount = Web3.toWei(withdrawals_amount, "gwei") + if NETWORK == GNOSIS_CHAIN: + # apply mGNO <-> GNO exchange rate + withdrawals_amount = Wei(int(withdrawals_amount * WAD // MGNO_RATE)) + return withdrawals_amount + + async def get_withdrawals_from_block(self, current_slot: int) -> BlockNumber | None: + slot_number = self.withdrawals_genesis_epoch * self.slots_per_epoch + while slot_number <= current_slot: + from_block = await get_execution_block( + session=self.aiohttp_session, slot_number=slot_number + ) + if from_block: + return from_block + slot_number += 1 + + async def get_withdrawals_to_block(self, current_slot: int) -> BlockNumber | None: + slot_number = current_slot + withdrawals_slot_number = self.withdrawals_genesis_epoch * self.slots_per_epoch + while slot_number >= withdrawals_slot_number: + to_block = await get_execution_block( + session=self.aiohttp_session, slot_number=slot_number + ) + if to_block: + return to_block + slot_number -= 1 + + def format_ether(self, value: Union[str, int, Wei]) -> str: + """Converts Wei value.""" + _value = int(value) + if _value < 0: + formatted_value = f'-{Web3.fromWei(abs(_value), "ether")}' + else: + formatted_value = f'{Web3.fromWei(_value, "ether")}' + + return f"{formatted_value} {self.deposit_token_symbol}" diff --git a/oracle/oracle/rewards/eth1.py b/oracle/oracle/rewards/eth1.py new file mode 100644 index 00000000..cfcce4e8 --- /dev/null +++ b/oracle/oracle/rewards/eth1.py @@ -0,0 +1,38 @@ +from typing import List + +from web3 import Web3 +from web3.types import BlockNumber + +from oracle.oracle.common.clients import execute_sw_gql_paginated_query +from oracle.oracle.common.graphql_queries import REGISTERED_VALIDATORS_QUERY +from oracle.oracle.rewards.types import Withdrawal +from oracle.settings import NETWORK + +from .types import RegisteredValidatorsPublicKeys + + +async def get_registered_validators_public_keys( + block_number: BlockNumber, +) -> RegisteredValidatorsPublicKeys: + """Fetches pool validators public keys.""" + validators: List = await execute_sw_gql_paginated_query( + network=NETWORK, + query=REGISTERED_VALIDATORS_QUERY, + variables=dict(block_number=block_number), + paginated_field="validators", + ) + return list(set([val["id"] for val in validators])) + + +def get_withdrawals( + execution_client: Web3, block_number: BlockNumber +) -> list[Withdrawal]: + """Fetches block withdrawals.""" + block = execution_client.eth.get_block(block_number) + return [ + Withdrawal( + validator_index=int(withdrawal["validatorIndex"], 0), + amount=int(withdrawal["amount"], 0), + ) + for withdrawal in block.get("withdrawals", []) + ] diff --git a/oracle/oracle/rewards/eth2.py b/oracle/oracle/rewards/eth2.py new file mode 100644 index 00000000..9fbe17d1 --- /dev/null +++ b/oracle/oracle/rewards/eth2.py @@ -0,0 +1,83 @@ +from enum import Enum +from typing import Dict, List + +import backoff +from aiohttp import ClientSession +from eth_typing import BlockNumber, HexStr + +from oracle.settings import NETWORK_CONFIG + + +class ValidatorStatus(Enum): + """Validator statuses in beacon chain""" + + PENDING_INITIALIZED = "pending_initialized" + PENDING_QUEUED = "pending_queued" + ACTIVE_ONGOING = "active_ongoing" + ACTIVE_EXITING = "active_exiting" + ACTIVE_SLASHED = "active_slashed" + EXITED_UNSLASHED = "exited_unslashed" + EXITED_SLASHED = "exited_slashed" + WITHDRAWAL_POSSIBLE = "withdrawal_possible" + WITHDRAWAL_DONE = "withdrawal_done" + + +PENDING_STATUSES = [ValidatorStatus.PENDING_INITIALIZED, ValidatorStatus.PENDING_QUEUED] + + +@backoff.on_exception(backoff.expo, Exception, max_time=900) +async def get_finality_checkpoints( + session: ClientSession, state_id: str = "head" +) -> Dict: + """Fetches finality checkpoints.""" + endpoint = f"{NETWORK_CONFIG['ETH2_ENDPOINT']}/eth/v1/beacon/states/{state_id}/finality_checkpoints" + async with session.get(endpoint) as response: + response.raise_for_status() + return (await response.json())["data"] + + +@backoff.on_exception(backoff.expo, Exception, max_time=900) +async def get_validators( + session: ClientSession, + public_keys: List[HexStr], + state_id: str = "head", +) -> List[Dict]: + """Fetches validators.""" + if not public_keys: + return [] + + _endpoint = NETWORK_CONFIG["ETH2_ENDPOINT"] + endpoint = f"{_endpoint}/eth/v1/beacon/states/{state_id}/validators?id={'&id='.join(public_keys)}" + + async with session.get(endpoint) as response: + response.raise_for_status() + return (await response.json())["data"] + + +@backoff.on_exception(backoff.expo, Exception, max_time=900) +async def get_genesis(session: ClientSession) -> Dict: + """Fetches beacon chain genesis.""" + endpoint = f"{NETWORK_CONFIG['ETH2_ENDPOINT']}/eth/v1/beacon/genesis" + async with session.get(endpoint) as response: + response.raise_for_status() + return (await response.json())["data"] + + +@backoff.on_exception(backoff.expo, Exception, max_time=900) +async def get_execution_block( + session: ClientSession, slot_number: int +) -> BlockNumber | None: + """Fetches beacon chain slot data.""" + + endpoint = f"{NETWORK_CONFIG['ETH2_ENDPOINT']}/eth/v2/beacon/blocks/{slot_number}" + async with session.get(endpoint) as response: + if response.status == 404: + return None + response.raise_for_status() + return BlockNumber( + int( + (await response.json())["data"]["message"]["body"]["execution_payload"][ + "block_number" + ] + ) + ) diff --git a/oracle/oracle/rewards/tests/__init__.py b/oracle/oracle/rewards/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/oracle/oracle/rewards/tests/test_controller.py b/oracle/oracle/rewards/tests/test_controller.py new file mode 100644 index 00000000..ff3324e4 --- /dev/null +++ b/oracle/oracle/rewards/tests/test_controller.py @@ -0,0 +1,139 @@ +from unittest.mock import patch + +import aiohttp +from web3 import Web3 +from web3.types import BlockNumber, Timestamp + +from oracle.oracle.tests.common import get_test_oracle +from oracle.oracle.tests.factories import faker + +from ..controller import RewardsController +from ..types import RewardsVotingParameters, Withdrawal + +epoch = faker.random_int(150000, 250000) + +w3 = Web3() + + +def get_finality_checkpoints(*args, **kwargs): + return { + "previous_justified": { + "epoch": str(epoch - 1), + "root": faker.eth_address(), + }, + "current_justified": { + "epoch": str(epoch), + "root": faker.eth_address(), + }, + "finalized": { + "epoch": str(epoch), + "root": faker.eth_address(), + }, + } + + +def get_registered_validators_public_keys(*args, **kwargs): + return [{"id": faker.public_key()} for x in range(3)] + + +def get_withdrawals(*args, **kwargs): + return [ + Withdrawal( + validator_index=faker.random_int(), + amount=faker.random_int(0.001 * 10**9, 0.01 * 10**9), + ) + for _ in range(2) + ] + + +def get_validator(status="active_ongoing"): + return { + "index": str(faker.random_int()), + "balance": str(faker.random_int(32 * 10**9, 40 * 10**9)), + "status": status, + "validator": { + "pubkey": faker.public_key(), + "withdrawal_credentials": faker.eth_address(), + "effective_balance": str(32 * 10**9), + "slashed": False, + "activation_eligibility_epoch": faker.random_int(100, epoch), + "activation_epoch": faker.random_int(100, epoch), + "exit_epoch": faker.random_int(epoch, epoch**2), + "withdrawable_epoch": faker.random_int(epoch, epoch**2), + }, + } + + +def get_validators(*args, **kwargs): + return [ + get_validator(), + get_validator(), + get_validator(status="pending_queued"), + ] + + +sw_gql_query = [get_registered_validators_public_keys()] + + +class TestRewardController: + async def test_process_success(self): + block_number = BlockNumber(14583706) + with patch( + "oracle.oracle.rewards.eth1.execute_sw_gql_paginated_query", + side_effect=sw_gql_query, + ), patch( + "oracle.oracle.rewards.controller.get_withdrawals", + side_effect=get_withdrawals, + ), patch( + "oracle.oracle.rewards.controller.get_finality_checkpoints", + side_effect=get_finality_checkpoints, + ), patch( + "oracle.oracle.rewards.controller.get_validators", + side_effect=get_validators, + ), patch( + "oracle.oracle.rewards.controller.get_execution_block", + side_effect=block_number - faker.random_int(1, 100), + ), patch( + "oracle.oracle.rewards.controller.submit_vote", return_value=None + ) as vote_mock: + session = aiohttp.ClientSession() + rewards_nonce = faker.random_int(1000, 2000) + total_rewards = faker.wei_amount() + total_fees = faker.wei_amount() + total_rewards += total_fees + + controller = RewardsController( + aiohttp_session=session, + genesis_timestamp=1606824023, + oracle=get_test_oracle(), + ) + await controller.process( + voting_params=RewardsVotingParameters( + rewards_nonce=rewards_nonce, + total_rewards=total_rewards, + total_fees=total_fees, + rewards_updated_at_timestamp=Timestamp(1649854536), + ), + current_block_number=block_number, + current_timestamp=Timestamp(1649941516), + ) + vote = { + "signature": "", + "nonce": rewards_nonce, + "activated_validators": 2, + "total_rewards": total_rewards, + } + encoded_data: bytes = w3.codec.encode_abi( + ["uint256", "uint256", "uint256"], + [vote["nonce"], vote["activated_validators"], vote["total_rewards"]], + ) + vote["total_rewards"] = str(vote["total_rewards"]) + vote_mock.assert_called() + vote = dict( + oracle=get_test_oracle(), + encoded_data=encoded_data, + vote=vote, + name="reward-vote.json", + ) + vote_mock.assert_called_once_with(**vote) + await session.close() diff --git a/oracle/oracle/rewards/types.py b/oracle/oracle/rewards/types.py new file mode 100644 index 00000000..13362f56 --- /dev/null +++ b/oracle/oracle/rewards/types.py @@ -0,0 +1,27 @@ +from typing import List, TypedDict + +from eth_typing import HexStr +from web3.types import Timestamp, Wei + + +class RewardsVotingParameters(TypedDict): + rewards_nonce: int + total_rewards: Wei + total_fees: Wei + rewards_updated_at_timestamp: Timestamp + + +class RewardVote(TypedDict): + nonce: int + signature: HexStr + + activated_validators: int + total_rewards: str + + +class Withdrawal(TypedDict): + validator_index: int + amount: int + + +RegisteredValidatorsPublicKeys = List[HexStr] diff --git a/oracle/oracle/tests/test_clients.py b/oracle/oracle/tests/test_clients.py index 77812c25..1d922391 100644 --- a/oracle/oracle/tests/test_clients.py +++ b/oracle/oracle/tests/test_clients.py @@ -5,6 +5,8 @@ from gql import gql from oracle.oracle.common.clients import ( + execute_ethereum_gql_query, + execute_ethereum_paginated_gql_query, execute_sw_gql_paginated_query, execute_sw_gql_query, execute_uniswap_v3_gql_query, @@ -92,12 +94,14 @@ async def _execute_query(data): async def test_basic(self): for query_func in [ + execute_ethereum_gql_query, execute_sw_gql_query, execute_uniswap_v3_gql_query, ]: await self._test_basic(query_func) for query_func in [ + execute_ethereum_paginated_gql_query, execute_sw_gql_paginated_query, execute_uniswap_v3_paginated_gql_query, ]: diff --git a/oracle/oracle/validators/__init__.py b/oracle/oracle/validators/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/oracle/oracle/validators/controller.py b/oracle/oracle/validators/controller.py new file mode 100644 index 00000000..94e51914 --- /dev/null +++ b/oracle/oracle/validators/controller.py @@ -0,0 +1,125 @@ +import logging +from typing import List, Set + +from eth_account.signers.local import LocalAccount +from eth_typing import BlockNumber, HexStr +from web3 import Web3 +from web3.types import Wei + +from oracle.networks import GNOSIS_CHAIN +from oracle.oracle.utils import save +from oracle.oracle.vote import submit_vote +from oracle.settings import ( + MGNO_RATE, + NETWORK, + NETWORK_CONFIG, + VALIDATOR_VOTE_FILENAME, + WAD, +) + +from .eth1 import get_validators_deposit_root, select_validator +from .types import ValidatorDepositData, ValidatorsVote, ValidatorVotingParameters + +logger = logging.getLogger(__name__) +w3 = Web3() + + +class ValidatorsController: + """Submits new validators registrations to the IPFS.""" + + def __init__(self, oracle: LocalAccount) -> None: + self.validator_deposit: Wei = Web3.toWei(32, "ether") + self.last_vote_public_key = None + self.last_vote_validators_deposit_root = None + self.oracle = oracle + self.validators_batch_size = NETWORK_CONFIG["VALIDATORS_BATCH_SIZE"] + self.last_validators_deposit_data: List[ValidatorDepositData] = [] + + @save + async def process( + self, + voting_params: ValidatorVotingParameters, + block_number: BlockNumber, + ) -> None: + """Process validators registration.""" + pool_balance = voting_params["pool_balance"] + if NETWORK == GNOSIS_CHAIN: + # apply GNO <-> mGNO exchange rate + pool_balance = Wei(int(pool_balance * MGNO_RATE // WAD)) + + # vote for up to "batch size" of the validators + validators_count: int = min( + self.validators_batch_size, pool_balance // self.validator_deposit + ) + if not validators_count: + # not enough balance to register next validator + return + + validators_deposit_data: List[ValidatorDepositData] = [] + used_pubkeys: Set[HexStr] = set() + for _ in range(validators_count): + # select next validator + # TODO: implement scoring system based on the operators performance + deposit_data = await select_validator( + block_number=block_number, + used_pubkeys=used_pubkeys, + ) + if deposit_data is None: + break + + used_pubkeys.add(deposit_data["public_key"]) + validators_deposit_data.append(deposit_data) + + if not validators_deposit_data: + logger.warning("Run out of validator keys") + return + + validators_deposit_root = await get_validators_deposit_root(block_number) + if ( + self.last_vote_validators_deposit_root == validators_deposit_root + and self.last_validators_deposit_data == validators_deposit_data + ): + # already voted for the validators + return + + # submit vote + current_nonce = voting_params["validators_nonce"] + deposit_data_payloads = [] + for deposit_data in validators_deposit_data: + operator = deposit_data["operator"] + public_key = deposit_data["public_key"] + deposit_data_payloads.append( + ( + operator, + deposit_data["withdrawal_credentials"], + deposit_data["deposit_data_root"], + public_key, + deposit_data["deposit_data_signature"], + ) + ) + logger.info( + f"Voting for the next validator: operator={operator}, public key={public_key}" + ) + + encoded_data: bytes = w3.codec.encode_abi( + ["uint256", "(address,bytes32,bytes32,bytes,bytes)[]", "bytes32"], + [current_nonce, deposit_data_payloads, validators_deposit_root], + ) + vote = ValidatorsVote( + signature=HexStr(""), + nonce=current_nonce, + validators_deposit_root=validators_deposit_root, + deposit_data=validators_deposit_data, + ) + + submit_vote( + oracle=self.oracle, + encoded_data=encoded_data, + vote=vote, + name=VALIDATOR_VOTE_FILENAME, + ) + logger.info("Submitted validators registration votes") + + # skip voting for the same validator and validators deposit root in the next check + self.last_validators_deposit_data = validators_deposit_data + self.last_vote_validators_deposit_root = validators_deposit_root diff --git a/oracle/oracle/validators/eth1.py b/oracle/oracle/validators/eth1.py new file mode 100644 index 00000000..ab24d844 --- /dev/null +++ b/oracle/oracle/validators/eth1.py @@ -0,0 +1,128 @@ +from typing import Dict, Set, Union + +from ens.constants import EMPTY_ADDR_HEX +from eth_typing import HexStr +from web3 import Web3 +from web3.types import BlockNumber + +from oracle.oracle.common.clients import ( + execute_ethereum_gql_query, + execute_sw_gql_query, +) +from oracle.oracle.common.graphql_queries import ( + LAST_VALIDATORS_QUERY, + OPERATORS_QUERY, + VALIDATOR_REGISTRATIONS_LATEST_INDEX_QUERY, + VALIDATOR_REGISTRATIONS_QUERY, +) +from oracle.oracle.common.ipfs import ipfs_fetch +from oracle.settings import NETWORK, NETWORK_CONFIG + +from .types import ValidatorDepositData + + +async def select_validator( + block_number: BlockNumber, used_pubkeys: Set[HexStr] +) -> Union[None, ValidatorDepositData]: + """Selects the next validator to register.""" + result: Dict = await execute_sw_gql_query( + network=NETWORK, + query=OPERATORS_QUERY, + variables=dict(block_number=block_number), + ) + operators = result["operators"] + result: Dict = await execute_sw_gql_query( + network=NETWORK, + query=LAST_VALIDATORS_QUERY, + variables=dict(block_number=block_number), + ) + + last_validators = result["validators"] + if last_validators: + last_operator_id = last_validators[0]["operator"]["id"] + index = _find_operator_index(operators, last_operator_id) + if index is not None and index != len(operators) - 1: + operators = operators[index + 1 :] + [operators[index]] + operators[:index] + + _move_to_bottom(operators, NETWORK_CONFIG["ORACLE_STAKEWISE_OPERATOR"]) + + for operator in operators: + merkle_proofs = operator["depositDataMerkleProofs"] + if not merkle_proofs: + continue + + operator_address = Web3.toChecksumAddress(operator["id"]) + deposit_data_index = int(operator["depositDataIndex"]) + deposit_datum = await ipfs_fetch(merkle_proofs) + + max_deposit_data_index = len(deposit_datum) - 1 + if deposit_data_index > max_deposit_data_index: + continue + + selected_deposit_data = deposit_datum[deposit_data_index] + public_key = selected_deposit_data["public_key"] + can_register = public_key not in used_pubkeys and await can_register_validator( + block_number, public_key + ) + while deposit_data_index < max_deposit_data_index and not can_register: + # the edge case when the validator was registered in previous merkle root + # and the deposit data is presented in the same. + deposit_data_index += 1 + selected_deposit_data = deposit_datum[deposit_data_index] + public_key = selected_deposit_data["public_key"] + can_register = ( + public_key not in used_pubkeys + and await can_register_validator(block_number, public_key) + ) + + if can_register: + return ValidatorDepositData( + operator=operator_address, + public_key=selected_deposit_data["public_key"], + withdrawal_credentials=selected_deposit_data["withdrawal_credentials"], + deposit_data_root=selected_deposit_data["deposit_data_root"], + deposit_data_signature=selected_deposit_data["signature"], + proof=selected_deposit_data["proof"], + ) + return None + + +async def can_register_validator(block_number: BlockNumber, public_key: HexStr) -> bool: + """Checks whether it's safe to register the validator.""" + result: Dict = await execute_ethereum_gql_query( + network=NETWORK, + query=VALIDATOR_REGISTRATIONS_QUERY, + variables=dict(block_number=block_number, public_key=public_key), + ) + registrations = result["validatorRegistrations"] + + return len(registrations) == 0 + + +async def get_validators_deposit_root(block_number: BlockNumber) -> HexStr: + """Fetches validators deposit root for protecting against operator submitting deposit prior to registration.""" + result: Dict = await execute_ethereum_gql_query( + network=NETWORK, + query=VALIDATOR_REGISTRATIONS_LATEST_INDEX_QUERY, + variables=dict(block_number=block_number), + ) + return result["validatorRegistrations"][0]["validatorsDepositRoot"] + + +def _move_to_bottom(operators, operator_id): + if operator_id == EMPTY_ADDR_HEX: + return + + index = _find_operator_index(operators, operator_id) + if index is not None: + operators.append(operators.pop(index)) + + +def _find_operator_index(operators, operator_id): + index = None + operator_id = Web3.toChecksumAddress(operator_id) + for i, operator in enumerate(operators): + if Web3.toChecksumAddress(operator["id"]) == operator_id: + index = i + break + return index diff --git a/oracle/oracle/validators/tests/__init__.py b/oracle/oracle/validators/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/oracle/oracle/validators/tests/test_controller.py b/oracle/oracle/validators/tests/test_controller.py new file mode 100644 index 00000000..754926d7 --- /dev/null +++ b/oracle/oracle/validators/tests/test_controller.py @@ -0,0 +1,182 @@ +from unittest.mock import patch + +from web3 import Web3 +from web3.types import BlockNumber + +from oracle.oracle.tests.common import get_test_oracle +from oracle.oracle.tests.factories import faker + +from ..controller import ValidatorsController +from ..types import ValidatorVotingParameters + +w3 = Web3() +block_number = faker.random_int(150000, 250000) + + +def select_operators(operator, *args, **kwargs): + return { + "operators": [ + { + "id": operator, # operator + "depositDataMerkleProofs": "/ipfs/" + faker.text(max_nb_chars=20), + "depositDataIndex": "5", + }, + ] + } + + +def select_validators(*args, **kwargs): + return {"validators": []} + + +def can_registor_validator(*args, **kwargs): + return {"validatorRegistrations": []} + + +def ipfs_fetch( + deposit_data_root, + public_key, + signature, + withdrawal_credentials, + proofs, +): + return [ + { + "amount": str(32 * 10**9), + "deposit_data_root": deposit_data_root, + "proof": proofs, + "public_key": public_key, + "signature": signature, + "withdrawal_credentials": withdrawal_credentials, + } + ] * 6 + + +def ipfs_fetch_query( + deposit_data_root, + public_key, + signature, + withdrawal_credentials, + proofs, +): + + return [ + ipfs_fetch( + deposit_data_root, public_key, signature, withdrawal_credentials, proofs + ) + ] + + +def get_validators_deposit_root(validatorsDepositRoot, *args, **kwargs): + return { + "validatorRegistrations": [{"validatorsDepositRoot": validatorsDepositRoot}] + } + + +def sw_gql_query(operator): + return [ + select_operators(operator), + select_validators(), + ] + + +def ethereum_gql_query(validatorsDepositRoot, *args, **kwargs): + return [ + can_registor_validator(), + get_validators_deposit_root(validatorsDepositRoot), + ] + + +class TestValidatorController: + async def test_process_low_balance(self): + with patch("oracle.oracle.vote.submit_vote", return_value=None) as vote_mock: + controller = ValidatorsController( + oracle=get_test_oracle(), + ) + await controller.process( + voting_params=ValidatorVotingParameters( + validators_nonce=faker.random_int(1000, 2000), + pool_balance=w3.toWei(31, "ether"), + ), + block_number=BlockNumber(14583706), + ) + assert vote_mock.mock_calls == [] + + async def test_process_success(self): + validators_nonce = faker.random_int(1000, 2000) + + vote = { + "signature": "", + "nonce": validators_nonce, + "validators_deposit_root": faker.eth_proof(), + "deposit_data": [ + { + "operator": faker.eth_address(), + "public_key": faker.eth_public_key(), + "withdrawal_credentials": faker.eth_address(), + "deposit_data_root": faker.eth_proof(), + "deposit_data_signature": faker.eth_signature(), + "proof": [faker.eth_proof()] * 6, + } + ], + } + with patch( + "oracle.oracle.validators.eth1.execute_sw_gql_query", + side_effect=sw_gql_query(operator=vote["deposit_data"][0]["operator"]), + ), patch( + "oracle.oracle.validators.eth1.execute_ethereum_gql_query", + side_effect=ethereum_gql_query( + validatorsDepositRoot=vote["validators_deposit_root"] + ), + ), patch( + "oracle.oracle.validators.eth1.ipfs_fetch", + side_effect=ipfs_fetch_query( + deposit_data_root=vote["deposit_data"][0]["deposit_data_root"], + public_key=vote["deposit_data"][0]["public_key"], + signature=vote["deposit_data"][0]["deposit_data_signature"], + withdrawal_credentials=vote["deposit_data"][0][ + "withdrawal_credentials" + ], + proofs=vote["deposit_data"][0]["proof"], + ), + ), patch( + "oracle.oracle.validators.controller.NETWORK", "goerli" + ), patch( + "oracle.oracle.validators.controller.submit_vote", return_value=None + ) as vote_mock: + controller = ValidatorsController( + oracle=get_test_oracle(), + ) + await controller.process( + voting_params=ValidatorVotingParameters( + validators_nonce=validators_nonce, + pool_balance=w3.toWei(33, "ether"), + ), + block_number=BlockNumber(14583706), + ) + + encoded_data: bytes = w3.codec.encode_abi( + ["uint256", "(address,bytes32,bytes32,bytes,bytes)[]", "bytes32"], + [ + vote["nonce"], + [ + ( + vote["deposit_data"][0]["operator"], + vote["deposit_data"][0]["withdrawal_credentials"], + vote["deposit_data"][0]["deposit_data_root"], + vote["deposit_data"][0]["public_key"], + vote["deposit_data"][0]["deposit_data_signature"], + ) + ], + vote["validators_deposit_root"], + ], + ) + + vote_mock.assert_called() + validator_vote = dict( + oracle=get_test_oracle(), + encoded_data=encoded_data, + vote=vote, + name="validator-vote.json", + ) + vote_mock.assert_called_once_with(**validator_vote) diff --git a/oracle/oracle/validators/types.py b/oracle/oracle/validators/types.py new file mode 100644 index 00000000..04a64a6e --- /dev/null +++ b/oracle/oracle/validators/types.py @@ -0,0 +1,34 @@ +from typing import List, TypedDict + +from eth_typing import ChecksumAddress, HexStr +from web3.types import Wei + + +class ValidatorVotingParameters(TypedDict): + validators_nonce: int + pool_balance: Wei + + +class MerkleDepositData(TypedDict): + public_key: HexStr + signature: HexStr + amount: str + withdrawal_credentials: HexStr + deposit_data_root: HexStr + proof: List[HexStr] + + +class ValidatorDepositData(TypedDict): + operator: ChecksumAddress + public_key: HexStr + withdrawal_credentials: HexStr + deposit_data_root: HexStr + deposit_data_signature: HexStr + proof: List[HexStr] + + +class ValidatorsVote(TypedDict): + nonce: int + validators_deposit_root: HexStr + signature: HexStr + deposit_data: List[ValidatorDepositData] diff --git a/oracle/oracle/vote.py b/oracle/oracle/vote.py index 5ae1d8bf..1451da06 100644 --- a/oracle/oracle/vote.py +++ b/oracle/oracle/vote.py @@ -1,5 +1,6 @@ import json import logging +from typing import Union import backoff import boto3 @@ -8,6 +9,8 @@ from web3 import Web3 from oracle.oracle.distributor.common.types import DistributorVote +from oracle.oracle.rewards.types import RewardVote +from oracle.oracle.validators.types import ValidatorsVote from oracle.settings import NETWORK_CONFIG logger = logging.getLogger(__name__) @@ -17,7 +20,7 @@ def submit_vote( oracle: LocalAccount, encoded_data: bytes, - vote: DistributorVote, + vote: Union[RewardVote, DistributorVote, ValidatorsVote], name: str, ) -> None: """Submits vote to the votes' aggregator.""" diff --git a/oracle/settings.py b/oracle/settings.py index 10e84d7d..d2553351 100644 --- a/oracle/settings.py +++ b/oracle/settings.py @@ -13,7 +13,9 @@ NETWORK_CONFIG = NETWORKS[NETWORK] +REWARD_VOTE_FILENAME = "reward-vote.json" DISTRIBUTOR_VOTE_FILENAME = "distributor-vote.json" +VALIDATOR_VOTE_FILENAME = "validator-vote.json" TEST_VOTE_FILENAME = "test-vote.json" # health server settings diff --git a/pyproject.toml b/pyproject.toml index 57d943e8..3e9fcf3b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "oracle" -version = "3.1.0" +version = "3.0.0" description = "StakeWise Oracles are responsible for submitting off-chain data." authors = ["Dmitri Tsumak "] license = "AGPL-3.0-only"