Skip to content

Commit

Permalink
raiden: perform health checks and enable WebRTC
Browse files Browse the repository at this point in the history
The health checks are done whenever

- a channel is opened
- a message is about to be sent
- the transport is started

Fixes #7151.
  • Loading branch information
Ivan Stanković authored and fredo committed Jun 21, 2021
1 parent ab33f87 commit cf97e5c
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 7 deletions.
17 changes: 15 additions & 2 deletions raiden/blockchain/events.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import time
from dataclasses import dataclass
from typing import Tuple
from typing import Callable, Tuple

import structlog
from eth_utils import to_canonical_address, to_checksum_address
Expand Down Expand Up @@ -294,6 +294,8 @@ def __init__(
contract_manager
)

self._listeners: List[Callable] = []

def fetch_logs_in_batch(self, target_block_number: BlockNumber) -> Optional[PollResult]:
"""Poll the smart contract events for a limited number of blocks to
avoid read timeouts (issue #3558).
Expand Down Expand Up @@ -562,6 +564,9 @@ def _query_and_track(
)
result.extend(decoded_events)

for listener in self._listeners:
listener(decoded_events)

# Go through the results and create the child filters, if necessary.
event_filter = new_filters_from_events(decoded_events)

Expand All @@ -575,6 +580,14 @@ def _query_and_track(

return result, max_request_duration

def uninstall_all_event_listeners(self) -> None:
def stop(self) -> None:
with self._filters_lock:
self._address_to_abi = {}

del self._listeners[:]

def register_listener(self, listener: Callable) -> None:
self._listeners.append(listener)

def unregister_listener(self, listener: Callable) -> None:
self._listeners.remove(listener)
25 changes: 22 additions & 3 deletions raiden/network/transport/matrix/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ def __init__(self, config: MatrixTransportConfig, environment: Environment) -> N
self._config = config
self._environment = environment
self._raiden_service: Optional["RaidenService"] = None
# the addresses for which we're currently trying to initialize WebRTC channels
self._web_rtc_channel_inits: Set[Address] = set()

if config.server == MATRIX_AUTO_SELECT_SERVER:
homeserver_candidates = config.available_servers
Expand Down Expand Up @@ -569,6 +571,15 @@ def start( # type: ignore

self._schedule_new_greenlet(self._set_presence, UserPresence.ONLINE)

chain_state = views.state_from_raiden(raiden_service)
for neighbour in views.all_neighbour_nodes(chain_state):
self.health_check_web_rtc(neighbour)

def health_check_web_rtc(self, partner: Address) -> None:
if not self._web_rtc_manager.has_ready_channel(partner):
# TODO: initialize WebRTC for the partner address
pass

def _set_presence(self, state: UserPresence) -> None:
waiting_period = randint(SET_PRESENCE_INTERVAL // 4, SET_PRESENCE_INTERVAL)
gevent.wait( # pylint: disable=gevent-disable-wait
Expand Down Expand Up @@ -1158,6 +1169,8 @@ def _send_raw(
) -> None:
assert self._raiden_service is not None, "_raiden_service not set"

self.health_check_web_rtc(receiver_address)

user_ids: Set[UserID] = set()
if self._web_rtc_manager.has_ready_channel(receiver_address):
communication_medium = CommunicationMedium.WEB_RTC
Expand Down Expand Up @@ -1201,8 +1214,7 @@ def _send_raw(
return

def _maybe_initialize_web_rtc(self, address: Address) -> None:

if self._stop_event.ready():
if self._stop_event.ready() or address in self._web_rtc_channel_inits:
return

assert self._raiden_service is not None, "_raiden_service not set"
Expand All @@ -1216,7 +1228,14 @@ def _maybe_initialize_web_rtc(self, address: Address) -> None:
partner_address=to_checksum_address(address),
)
# initiate web rtc handling
self._schedule_new_greenlet(self._initialize_web_rtc, address)
self._schedule_new_greenlet(self._wrapped_initialize_web_rtc, address)

def _wrapped_initialize_web_rtc(self, address: Address) -> None:
self._web_rtc_channel_inits.add(address)
try:
return self._initialize_web_rtc(address)
finally:
self._web_rtc_channel_inits.remove(address)

def _initialize_web_rtc(self, partner_address: Address) -> None:
assert self._raiden_service is not None, "_raiden_service not set"
Expand Down
17 changes: 15 additions & 2 deletions raiden/raiden_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from raiden.api.python import RaidenAPI
from raiden.api.rest import APIServer, RestAPI
from raiden.blockchain.decode import blockchainevent_to_statechange
from raiden.blockchain.events import BlockchainEvents
from raiden.blockchain.events import BlockchainEvents, DecodedEvent
from raiden.blockchain.filters import RaidenContractFilter
from raiden.constants import (
ABSENT_SECRET,
Expand Down Expand Up @@ -132,6 +132,7 @@
typecheck,
)
from raiden.utils.upgrades import UpgradeManager
from raiden_contracts.constants import ChannelEvent
from raiden_contracts.contract_manager import ContractManager

log = structlog.get_logger(__name__)
Expand Down Expand Up @@ -526,7 +527,7 @@ def stop(self) -> None:
assert (
self.blockchain_events
), f"The blockchain_events has to be set by the start. node:{self!r}"
self.blockchain_events.uninstall_all_event_listeners()
self.blockchain_events.stop()

# Close storage DB to release internal DB lock
assert (
Expand Down Expand Up @@ -774,6 +775,7 @@ def _synchronize_with_blockchain(self) -> None:
block_batch_size_config=self.config.blockchain.block_batch_size_config,
node_address=self.address,
)
blockchain_events.register_listener(self._blockchain_event_listener)

self.last_log_block = last_block_number
self.last_log_time = time.monotonic()
Expand All @@ -790,6 +792,17 @@ def _synchronize_with_blockchain(self) -> None:

self.alarm.register_callback(self._best_effort_synchronize)

def _blockchain_event_listener(self, events: List[DecodedEvent]) -> None:
for event in events:
args = event.event_data["args"]
if event.event_data["event"] == ChannelEvent.OPENED:
other = (
args["participant1"]
if args["participant1"] != self.address
else args["participant2"]
)
self.transport.health_check_web_rtc(other)

def _start_alarm_task(self) -> None:
"""Start the alarm task.
Expand Down

0 comments on commit cf97e5c

Please sign in to comment.