Skip to content

Commit

Permalink
Merge testnet-3 into add-exit-sig-deadline
Browse files Browse the repository at this point in the history
Signed-off-by: cyc60 <[email protected]>
  • Loading branch information
cyc60 committed Aug 24, 2023
2 parents fed6eea + ca2b878 commit 8bbaf8f
Show file tree
Hide file tree
Showing 11 changed files with 156 additions and 35 deletions.
33 changes: 31 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ bandit = { version = "==1.7.4", extras = ["toml"] }
black = { version = "==23.1.0", extras = ["d"] }
pyinstaller = "==5.7.0"
faker = "==18.10.1"
flake8-datetime-utcnow-plugin = "==0.1.2"
flake8-print = "==5.0.0"

[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down
6 changes: 5 additions & 1 deletion src/common/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ async def get_oracles() -> Oracles:
endpoints = []
public_keys = []
for oracle in config['oracles']:
endpoints.append(oracle['endpoint'])
if endpoint := oracle.get('endpoint'):
replicas = [endpoint]
else:
replicas = oracle['endpoints']
endpoints.append(replicas)
public_keys.append(oracle['public_key'])

if not 1 <= rewards_threshold <= len(config['oracles']):
Expand Down
8 changes: 6 additions & 2 deletions src/common/startup_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,15 @@ async def wait_for_execution_node() -> None:


async def collect_healthy_oracles() -> list:
oracles = (await get_oracles()).endpoints
endpoints = (await get_oracles()).endpoints

async with ClientSession(timeout=ClientTimeout(60)) as session:
results = await asyncio.gather(
*[_aiohttp_fetch(session=session, url=endpoint) for endpoint in oracles],
*[
_aiohttp_fetch(session=session, url=endpoint)
for replicas in endpoints
for endpoint in replicas
],
return_exceptions=True
)

Expand Down
2 changes: 1 addition & 1 deletion src/common/typings.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class Oracles:
validators_threshold: int
exit_signature_recover_threshold: int
public_keys: list[HexStr]
endpoints: list[str]
endpoints: list[list[str]]

validators_approval_batch_limit: int
validators_exit_rotation_batch_limit: int
Expand Down
6 changes: 4 additions & 2 deletions src/exits/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ async def wait_oracles_signature_update(oracles: Oracles) -> None:
max_time = 10 * float(settings.network_config.SECONDS_PER_BLOCK)
oracle_tasks = (
wait_oracle_signature_update(update_block, endpoint, max_time=max_time)
for endpoint in oracles.endpoints
for replicas in oracles.endpoints
for endpoint in replicas
)
await asyncio.gather(*oracle_tasks)

Expand Down Expand Up @@ -180,7 +181,8 @@ async def update_exit_signatures_periodically(keystores: Keystores):
try:
oracles = await get_oracles()

oracle_endpoint = random.choice(oracles.endpoints) # nosec
oracle_replicas = random.choice(oracles.endpoints) # nosec
oracle_endpoint = random.choice(oracle_replicas) # nosec
outdated_indexes = await fetch_outdated_indexes(oracle_endpoint)

if outdated_indexes:
Expand Down
33 changes: 30 additions & 3 deletions src/exits/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import asyncio
import logging
import random
from urllib.parse import urljoin

import aiohttp
from aiohttp import ClientError
from eth_typing import ChecksumAddress
from sw_utils.decorators import retry_aiohttp_errors
from web3 import Web3
Expand All @@ -25,9 +27,8 @@ async def send_signature_rotation_requests(
ipfs_hash = None
responses: dict[ChecksumAddress, bytes] = {}
async with aiohttp.ClientSession() as session:
for address, oracle_endpoint in endpoints:
endpoint = urljoin(oracle_endpoint, UPDATE_SIGNATURES_URL_PATH)
response = await send_signature_rotation_request(session, endpoint, payload)
for address, replicas in endpoints:
response = await send_signature_rotation_request_to_replicas(session, replicas, payload)
if ipfs_hash is None:
ipfs_hash = response.ipfs_hash
elif ipfs_hash != response.ipfs_hash:
Expand All @@ -45,11 +46,37 @@ async def send_signature_rotation_requests(
return signatures, ipfs_hash


# pylint: disable=duplicate-code
@retry_aiohttp_errors(delay=DEFAULT_RETRY_TIME)
async def send_signature_rotation_request_to_replicas(
session: aiohttp.ClientSession, replicas: list[str], payload: dict
) -> OracleApproval:
last_error = None

# Shuffling may help if the first endpoint is slower than others
replicas = random.sample(replicas, len(replicas))

for endpoint in replicas:
try:
return await send_signature_rotation_request(session, endpoint, payload)
except (ClientError, asyncio.TimeoutError) as e:
logger.debug('%s for %s', repr(e), endpoint)
last_error = e

if last_error:
raise last_error

raise RuntimeError('Failed to get response from replicas')


async def send_signature_rotation_request(
session: aiohttp.ClientSession, endpoint: str, payload: dict
) -> OracleApproval:
"""Requests exit signature rotation from single oracle."""
logger.debug('send_signature_rotation_request to %s', endpoint)

endpoint = urljoin(endpoint, UPDATE_SIGNATURES_URL_PATH)

async with session.post(url=endpoint, json=payload) as response:
response.raise_for_status()
data = await response.json()
Expand Down
13 changes: 8 additions & 5 deletions src/validators/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
get_eth1_withdrawal_credentials,
is_valid_deposit_data_signature,
)
from sw_utils.typings import Bytes32
from web3 import Web3
from web3.types import EventData, Wei

Expand Down Expand Up @@ -225,6 +226,7 @@ async def register_single_validator(
validator: Validator,
approval: OraclesApproval,
update_state_call: HexStr | None,
validators_registry_root: Bytes32,
) -> None:
"""Registers single validator."""
if settings.network not in ETH_NETWORKS:
Expand All @@ -237,7 +239,7 @@ async def register_single_validator(
logger.info('Submitting registration transaction')
register_call_args = [
(
approval.validators_registry_root,
validators_registry_root,
tx_validator,
approval.signatures,
approval.ipfs_hash,
Expand All @@ -262,7 +264,8 @@ async def register_multiple_validator(
tree: StandardMerkleTree,
validators: list[Validator],
approval: OraclesApproval,
update_call: HexStr | None,
update_state_call: HexStr | None,
validators_registry_root: Bytes32,
) -> None:
"""Registers multiple validators."""
if settings.network not in ETH_NETWORKS:
Expand All @@ -283,7 +286,7 @@ async def register_multiple_validator(
logger.info('Submitting registration transaction')
register_call_args = [
(
approval.validators_registry_root,
validators_registry_root,
b''.join(tx_validators),
approval.signatures,
approval.ipfs_hash,
Expand All @@ -292,12 +295,12 @@ async def register_multiple_validator(
multi_proof.proof_flags,
multi_proof.proof,
]
if update_call is not None:
if update_state_call is not None:
register_call = vault_contract.encodeABI(
fn_name='registerValidators',
args=register_call_args,
)
tx = await vault_contract.functions.multicall([update_call, register_call]).transact()
tx = await vault_contract.functions.multicall([update_state_call, register_call]).transact()
else:
tx = await vault_contract.functions.registerValidators(*register_call_args).transact()

Expand Down
52 changes: 38 additions & 14 deletions src/validators/tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging

from sw_utils.typings import Bytes32
from web3 import Web3
from web3.types import BlockNumber, Wei

Expand Down Expand Up @@ -70,7 +71,27 @@ async def register_validators(keystores: Keystores, deposit_data: DepositData) -
)
return

oracles_approval = await get_oracles_approval(oracles, keystores, validators)
registry_root = None

while True:
latest_registry_root = await validators_registry_contract.get_registry_root()

if not registry_root or registry_root != latest_registry_root:
registry_root = latest_registry_root
logger.debug('Fetched latest validators registry root: %s', registry_root)

oracles_request = await create_approval_request(
registry_root=registry_root,
oracles=oracles,
keystores=keystores,
validators=validators,
)

try:
oracles_approval = await get_oracles_approval(oracles=oracles, request=oracles_request)
break
except Exception as e:
logger.exception(e)

if len(validators) == 1:
validator = validators[0]
Expand All @@ -79,30 +100,30 @@ async def register_validators(keystores: Keystores, deposit_data: DepositData) -
validator=validator,
approval=oracles_approval,
update_state_call=update_state_call,
validators_registry_root=registry_root,
)
logger.info('Successfully registered validator with public key %s', validator.public_key)

if len(validators) > 1:
await register_multiple_validator(
deposit_data.tree, validators, oracles_approval, update_state_call
tree=deposit_data.tree,
validators=validators,
approval=oracles_approval,
update_state_call=update_state_call,
validators_registry_root=registry_root,
)
pub_keys = ', '.join([val.public_key for val in validators])
logger.info('Successfully registered validators with public keys %s', pub_keys)


async def get_oracles_approval(
oracles: Oracles, keystores: Keystores, validators: list[Validator]
) -> OraclesApproval:
"""Fetches approval from oracles."""

# get latest registry root
registry_root = await validators_registry_contract.get_registry_root()
logger.debug('Fetched latest validators registry root: %s', registry_root)
async def create_approval_request(
oracles: Oracles, keystores: Keystores, validators: list[Validator], registry_root: Bytes32
) -> ApprovalRequest:
"""Generate validator registration request data"""

# get next validator index for exit signature
latest_public_keys = await get_latest_network_validator_public_keys()
validator_index = NetworkValidatorCrud().get_next_validator_index(list(latest_public_keys))
start_validator_index = validator_index
logger.debug('Next validator index for exit signature: %d', validator_index)

# fetch current fork data
Expand Down Expand Up @@ -138,18 +159,21 @@ async def get_oracles_approval(
request.exit_signature_shards.append(shards.exit_signatures)

validator_index += 1
return request


async def get_oracles_approval(oracles: Oracles, request: ApprovalRequest) -> OraclesApproval:
"""Fetches approval from oracles."""
# send approval request to oracles
signatures, ipfs_hash = await send_approval_requests(oracles, request)
logger.info(
'Fetched oracles approval for validators: count=%d, start index=%d',
len(validators),
start_validator_index,
len(request.public_keys),
request.validator_index,
)
return OraclesApproval(
signatures=signatures,
ipfs_hash=ipfs_hash,
validators_registry_root=registry_root,
)


Expand Down
2 changes: 1 addition & 1 deletion src/validators/typings.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ class ExitSignatureShards:

@dataclass
class OraclesApproval:
validators_registry_root: Bytes32
signatures: bytes
ipfs_hash: str


@dataclass
# pylint: disable-next=too-many-instance-attributes
class ApprovalRequest:
validator_index: int
vault_address: ChecksumAddress
Expand Down
Loading

0 comments on commit 8bbaf8f

Please sign in to comment.