Skip to content

Commit

Permalink
Use oracle replicas (#150)
Browse files Browse the repository at this point in the history
* Use oracle replicas

* Shuffle replicas
  • Loading branch information
evgeny-stakewise authored Aug 24, 2023
1 parent abeeca0 commit d53a8ec
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 12 deletions.
6 changes: 5 additions & 1 deletion src/common/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ async def get_oracles() -> Oracles:
endpoints = []
public_keys = []
for oracle in config['oracles']:
endpoints.append(oracle['endpoint'])
if endpoint := oracle.get('endpoint'):
replicas = [endpoint]
else:
replicas = oracle['endpoints']
endpoints.append(replicas)
public_keys.append(oracle['public_key'])

if not 1 <= rewards_threshold <= len(config['oracles']):
Expand Down
8 changes: 6 additions & 2 deletions src/common/startup_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,15 @@ async def wait_for_execution_node() -> None:


async def collect_healthy_oracles() -> list:
oracles = (await get_oracles()).endpoints
endpoints = (await get_oracles()).endpoints

async with ClientSession(timeout=ClientTimeout(60)) as session:
results = await asyncio.gather(
*[_aiohttp_fetch(session=session, url=endpoint) for endpoint in oracles],
*[
_aiohttp_fetch(session=session, url=endpoint)
for replicas in endpoints
for endpoint in replicas
],
return_exceptions=True
)

Expand Down
2 changes: 1 addition & 1 deletion src/common/typings.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class Oracles:
validators_threshold: int
exit_signature_recover_threshold: int
public_keys: list[HexStr]
endpoints: list[str]
endpoints: list[list[str]]

validators_approval_batch_limit: int
validators_exit_rotation_batch_limit: int
Expand Down
6 changes: 4 additions & 2 deletions src/exits/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ async def wait_oracles_signature_update(oracles: Oracles) -> None:
max_time = 10 * float(settings.network_config.SECONDS_PER_BLOCK)
oracle_tasks = (
wait_oracle_signature_update(update_block, endpoint, max_time=max_time)
for endpoint in oracles.endpoints
for replicas in oracles.endpoints
for endpoint in replicas
)
await asyncio.gather(*oracle_tasks)

Expand Down Expand Up @@ -180,7 +181,8 @@ async def update_exit_signatures_periodically(keystores: Keystores):
try:
oracles = await get_oracles()

oracle_endpoint = random.choice(oracles.endpoints) # nosec
oracle_replicas = random.choice(oracles.endpoints) # nosec
oracle_endpoint = random.choice(oracle_replicas) # nosec
outdated_indexes = await fetch_outdated_indexes(oracle_endpoint)

if outdated_indexes:
Expand Down
33 changes: 30 additions & 3 deletions src/exits/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import asyncio
import dataclasses
import logging
import random
from urllib.parse import urljoin

import aiohttp
from aiohttp import ClientError
from eth_typing import ChecksumAddress
from sw_utils.decorators import retry_aiohttp_errors
from web3 import Web3
Expand All @@ -26,9 +28,8 @@ async def send_signature_rotation_requests(
ipfs_hash = None
responses: dict[ChecksumAddress, bytes] = {}
async with aiohttp.ClientSession() as session:
for address, oracle_endpoint in endpoints:
endpoint = urljoin(oracle_endpoint, UPDATE_SIGNATURES_URL_PATH)
response = await send_signature_rotation_request(session, endpoint, payload)
for address, replicas in endpoints:
response = await send_signature_rotation_request_to_replicas(session, replicas, payload)
if ipfs_hash is None:
ipfs_hash = response.ipfs_hash
elif ipfs_hash != response.ipfs_hash:
Expand All @@ -46,11 +47,37 @@ async def send_signature_rotation_requests(
return signatures, ipfs_hash


# pylint: disable=duplicate-code
@retry_aiohttp_errors(delay=DEFAULT_RETRY_TIME)
async def send_signature_rotation_request_to_replicas(
session: aiohttp.ClientSession, replicas: list[str], payload: dict
) -> OracleApproval:
last_error = None

# Shuffling may help if the first endpoint is slower than others
replicas = random.sample(replicas, len(replicas))

for endpoint in replicas:
try:
return await send_signature_rotation_request(session, endpoint, payload)
except (ClientError, asyncio.TimeoutError) as e:
logger.debug('%s for %s', repr(e), endpoint)
last_error = e

if last_error:
raise last_error

raise RuntimeError('Failed to get response from replicas')


async def send_signature_rotation_request(
session: aiohttp.ClientSession, endpoint: str, payload: dict
) -> OracleApproval:
"""Requests exit signature rotation from single oracle."""
logger.debug('send_signature_rotation_request to %s', endpoint)

endpoint = urljoin(endpoint, UPDATE_SIGNATURES_URL_PATH)

async with session.post(url=endpoint, json=payload) as response:
response.raise_for_status()
data = await response.json()
Expand Down
32 changes: 29 additions & 3 deletions src/validators/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import dataclasses
import json
import logging
import random
from multiprocessing import Pool
from os import listdir
from os.path import isfile, join
Expand Down Expand Up @@ -52,8 +53,10 @@ async def send_approval_requests(oracles: Oracles, request: ApprovalRequest) ->
async with ClientSession(timeout=ClientTimeout(ORACLES_VALIDATORS_TIMEOUT)) as session:
results = await asyncio.gather(
*[
send_approval_request(session=session, endpoint=endpoint, payload=payload)
for address, endpoint in endpoints
send_approval_request_to_replicas(
session=session, replicas=replicas, payload=payload
)
for address, replicas in endpoints
],
return_exceptions=True,
)
Expand Down Expand Up @@ -81,16 +84,39 @@ async def send_approval_requests(oracles: Oracles, request: ApprovalRequest) ->
return signatures, ipfs_hashes[0]


# pylint: disable=duplicate-code
@retry_aiohttp_errors(delay=DEFAULT_RETRY_TIME)
async def send_approval_request_to_replicas(
session: ClientSession, replicas: list[str], payload: dict
) -> OracleApproval:
last_error = None

# Shuffling may help if the first endpoint is slower than others
replicas = random.sample(replicas, len(replicas))

for endpoint in replicas:
try:
return await send_approval_request(session, endpoint, payload)
except (ClientError, asyncio.TimeoutError) as e:
logger.debug('%s for %s', repr(e), endpoint)
last_error = e

if last_error:
raise last_error

raise RuntimeError('Failed to get response from replicas')


async def send_approval_request(
session: ClientSession, endpoint: str, payload: dict
) -> OracleApproval:
"""Requests approval from single oracle."""
logger.debug('send_approval_request to %s', endpoint)
try:
async with session.post(url=endpoint, json=payload) as response:
response.raise_for_status()
data = await response.json()
except ClientError as e:
except (ClientError, asyncio.TimeoutError) as e:
registry_root = await validators_registry_contract.get_registry_root()
if Web3.to_hex(registry_root) != payload['validators_root']:
raise RegistryRootChangedError from e
Expand Down

0 comments on commit d53a8ec

Please sign in to comment.