Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use oracle replicas #150

Merged
merged 2 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/common/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ async def get_oracles() -> Oracles:
endpoints = []
public_keys = []
for oracle in config['oracles']:
endpoints.append(oracle['endpoint'])
if endpoint := oracle.get('endpoint'):
replicas = [endpoint]
else:
replicas = oracle['endpoints']
endpoints.append(replicas)
public_keys.append(oracle['public_key'])

if not 1 <= rewards_threshold <= len(config['oracles']):
Expand Down
8 changes: 6 additions & 2 deletions src/common/startup_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,15 @@ async def wait_for_execution_node() -> None:


async def collect_healthy_oracles() -> list:
oracles = (await get_oracles()).endpoints
endpoints = (await get_oracles()).endpoints

async with ClientSession(timeout=ClientTimeout(60)) as session:
results = await asyncio.gather(
*[_aiohttp_fetch(session=session, url=endpoint) for endpoint in oracles],
*[
_aiohttp_fetch(session=session, url=endpoint)
for replicas in endpoints
for endpoint in replicas
],
return_exceptions=True
)

Expand Down
2 changes: 1 addition & 1 deletion src/common/typings.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class Oracles:
validators_threshold: int
exit_signature_recover_threshold: int
public_keys: list[HexStr]
endpoints: list[str]
endpoints: list[list[str]]

validators_approval_batch_limit: int
validators_exit_rotation_batch_limit: int
Expand Down
6 changes: 4 additions & 2 deletions src/exits/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ async def wait_oracles_signature_update(oracles: Oracles) -> None:
max_time = 10 * float(settings.network_config.SECONDS_PER_BLOCK)
oracle_tasks = (
wait_oracle_signature_update(update_block, endpoint, max_time=max_time)
for endpoint in oracles.endpoints
for replicas in oracles.endpoints
for endpoint in replicas
)
await asyncio.gather(*oracle_tasks)

Expand Down Expand Up @@ -180,7 +181,8 @@ async def update_exit_signatures_periodically(keystores: Keystores):
try:
oracles = await get_oracles()

oracle_endpoint = random.choice(oracles.endpoints) # nosec
oracle_replicas = random.choice(oracles.endpoints) # nosec
oracle_endpoint = random.choice(oracle_replicas) # nosec
outdated_indexes = await fetch_outdated_indexes(oracle_endpoint)

if outdated_indexes:
Expand Down
33 changes: 30 additions & 3 deletions src/exits/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import asyncio
import dataclasses
import logging
import random
from urllib.parse import urljoin

import aiohttp
from aiohttp import ClientError
from eth_typing import ChecksumAddress
from sw_utils.decorators import retry_aiohttp_errors
from web3 import Web3
Expand All @@ -26,9 +28,8 @@ async def send_signature_rotation_requests(
ipfs_hash = None
responses: dict[ChecksumAddress, bytes] = {}
async with aiohttp.ClientSession() as session:
for address, oracle_endpoint in endpoints:
endpoint = urljoin(oracle_endpoint, UPDATE_SIGNATURES_URL_PATH)
response = await send_signature_rotation_request(session, endpoint, payload)
for address, replicas in endpoints:
response = await send_signature_rotation_request_to_replicas(session, replicas, payload)
if ipfs_hash is None:
ipfs_hash = response.ipfs_hash
elif ipfs_hash != response.ipfs_hash:
Expand All @@ -46,11 +47,37 @@ async def send_signature_rotation_requests(
return signatures, ipfs_hash


# pylint: disable=duplicate-code
@retry_aiohttp_errors(delay=DEFAULT_RETRY_TIME)
async def send_signature_rotation_request_to_replicas(
session: aiohttp.ClientSession, replicas: list[str], payload: dict
) -> OracleApproval:
last_error = None

# Shuffling may help if the first endpoint is slower than others
replicas = random.sample(replicas, len(replicas))

for endpoint in replicas:
try:
return await send_signature_rotation_request(session, endpoint, payload)
except (ClientError, asyncio.TimeoutError) as e:
logger.debug('%s for %s', repr(e), endpoint)
last_error = e

if last_error:
raise last_error

raise RuntimeError('Failed to get response from replicas')


async def send_signature_rotation_request(
session: aiohttp.ClientSession, endpoint: str, payload: dict
) -> OracleApproval:
"""Requests exit signature rotation from single oracle."""
logger.debug('send_signature_rotation_request to %s', endpoint)

endpoint = urljoin(endpoint, UPDATE_SIGNATURES_URL_PATH)

async with session.post(url=endpoint, json=payload) as response:
response.raise_for_status()
data = await response.json()
Expand Down
32 changes: 29 additions & 3 deletions src/validators/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import dataclasses
import json
import logging
import random
from multiprocessing import Pool
from os import listdir
from os.path import isfile, join
Expand Down Expand Up @@ -52,8 +53,10 @@ async def send_approval_requests(oracles: Oracles, request: ApprovalRequest) ->
async with ClientSession(timeout=ClientTimeout(ORACLES_VALIDATORS_TIMEOUT)) as session:
results = await asyncio.gather(
*[
send_approval_request(session=session, endpoint=endpoint, payload=payload)
for address, endpoint in endpoints
send_approval_request_to_replicas(
session=session, replicas=replicas, payload=payload
)
for address, replicas in endpoints
],
return_exceptions=True,
)
Expand Down Expand Up @@ -81,16 +84,39 @@ async def send_approval_requests(oracles: Oracles, request: ApprovalRequest) ->
return signatures, ipfs_hashes[0]


# pylint: disable=duplicate-code
@retry_aiohttp_errors(delay=DEFAULT_RETRY_TIME)
async def send_approval_request_to_replicas(
session: ClientSession, replicas: list[str], payload: dict
) -> OracleApproval:
last_error = None

# Shuffling may help if the first endpoint is slower than others
replicas = random.sample(replicas, len(replicas))

for endpoint in replicas:
try:
return await send_approval_request(session, endpoint, payload)
except (ClientError, asyncio.TimeoutError) as e:
logger.debug('%s for %s', repr(e), endpoint)
last_error = e

if last_error:
raise last_error

raise RuntimeError('Failed to get response from replicas')


async def send_approval_request(
session: ClientSession, endpoint: str, payload: dict
) -> OracleApproval:
"""Requests approval from single oracle."""
logger.debug('send_approval_request to %s', endpoint)
try:
async with session.post(url=endpoint, json=payload) as response:
response.raise_for_status()
data = await response.json()
except ClientError as e:
except (ClientError, asyncio.TimeoutError) as e:
registry_root = await validators_registry_contract.get_registry_root()
if Web3.to_hex(registry_root) != payload['validators_root']:
raise RegistryRootChangedError from e
Expand Down