diff --git a/raiden/blockchain/events.py b/raiden/blockchain/events.py index 35592a69da4..de1d6ecd54a 100644 --- a/raiden/blockchain/events.py +++ b/raiden/blockchain/events.py @@ -1,6 +1,6 @@ import time from dataclasses import dataclass -from typing import Tuple +from typing import Callable, Tuple import structlog from eth_utils import to_canonical_address, to_checksum_address @@ -294,6 +294,8 @@ def __init__( contract_manager ) + self._listeners: List[Callable] = [] + def fetch_logs_in_batch(self, target_block_number: BlockNumber) -> Optional[PollResult]: """Poll the smart contract events for a limited number of blocks to avoid read timeouts (issue #3558). @@ -562,6 +564,9 @@ def _query_and_track( ) result.extend(decoded_events) + for listener in self._listeners: + listener(decoded_events) + # Go through the results and create the child filters, if necessary. event_filter = new_filters_from_events(decoded_events) @@ -575,6 +580,14 @@ def _query_and_track( return result, max_request_duration - def uninstall_all_event_listeners(self) -> None: + def stop(self) -> None: with self._filters_lock: self._address_to_abi = {} + + del self._listeners[:] + + def register_listener(self, listener: Callable) -> None: + self._listeners.append(listener) + + def unregister_listener(self, listener: Callable) -> None: + self._listeners.remove(listener) diff --git a/raiden/network/transport/matrix/transport.py b/raiden/network/transport/matrix/transport.py index a3d88aa808b..ed3eb1c2b63 100644 --- a/raiden/network/transport/matrix/transport.py +++ b/raiden/network/transport/matrix/transport.py @@ -389,6 +389,8 @@ def __init__(self, config: MatrixTransportConfig, environment: Environment) -> N self._config = config self._environment = environment self._raiden_service: Optional["RaidenService"] = None + # the addresses for which we're currently trying to initialize WebRTC channels + self._web_rtc_channel_inits: Set[Address] = set() if config.server == MATRIX_AUTO_SELECT_SERVER: homeserver_candidates = config.available_servers @@ -569,6 +571,15 @@ def start( # type: ignore self._schedule_new_greenlet(self._set_presence, UserPresence.ONLINE) + chain_state = views.state_from_raiden(raiden_service) + for neighbour in views.all_neighbour_nodes(chain_state): + self.health_check_web_rtc(neighbour) + + def health_check_web_rtc(self, partner: Address) -> None: + if not self._web_rtc_manager.has_ready_channel(partner): + # TODO: initialize WebRTC for the partner address + pass + def _set_presence(self, state: UserPresence) -> None: waiting_period = randint(SET_PRESENCE_INTERVAL // 4, SET_PRESENCE_INTERVAL) gevent.wait( # pylint: disable=gevent-disable-wait @@ -1158,6 +1169,8 @@ def _send_raw( ) -> None: assert self._raiden_service is not None, "_raiden_service not set" + self.health_check_web_rtc(receiver_address) + user_ids: Set[UserID] = set() if self._web_rtc_manager.has_ready_channel(receiver_address): communication_medium = CommunicationMedium.WEB_RTC @@ -1201,8 +1214,7 @@ def _send_raw( return def _maybe_initialize_web_rtc(self, address: Address) -> None: - - if self._stop_event.ready(): + if self._stop_event.ready() or address in self._web_rtc_channel_inits: return assert self._raiden_service is not None, "_raiden_service not set" @@ -1216,7 +1228,11 @@ def _maybe_initialize_web_rtc(self, address: Address) -> None: partner_address=to_checksum_address(address), ) # initiate web rtc handling - self._schedule_new_greenlet(self._initialize_web_rtc, address) + self._web_rtc_channel_inits.add(address) + try: + self._schedule_new_greenlet(self._initialize_web_rtc, address) + finally: + self._web_rtc_channel_inits.remove(address) def _initialize_web_rtc(self, partner_address: Address) -> None: assert self._raiden_service is not None, "_raiden_service not set" diff --git a/raiden/raiden_service.py b/raiden/raiden_service.py index 262e825aa04..adbc1a8740b 100644 --- a/raiden/raiden_service.py +++ b/raiden/raiden_service.py @@ -21,7 +21,7 @@ from raiden.api.python import RaidenAPI from raiden.api.rest import APIServer, RestAPI from raiden.blockchain.decode import blockchainevent_to_statechange -from raiden.blockchain.events import BlockchainEvents +from raiden.blockchain.events import BlockchainEvents, DecodedEvent from raiden.blockchain.filters import RaidenContractFilter from raiden.constants import ( ABSENT_SECRET, @@ -132,6 +132,7 @@ typecheck, ) from raiden.utils.upgrades import UpgradeManager +from raiden_contracts.constants import ChannelEvent from raiden_contracts.contract_manager import ContractManager log = structlog.get_logger(__name__) @@ -526,7 +527,7 @@ def stop(self) -> None: assert ( self.blockchain_events ), f"The blockchain_events has to be set by the start. node:{self!r}" - self.blockchain_events.uninstall_all_event_listeners() + self.blockchain_events.stop() # Close storage DB to release internal DB lock assert ( @@ -774,6 +775,7 @@ def _synchronize_with_blockchain(self) -> None: block_batch_size_config=self.config.blockchain.block_batch_size_config, node_address=self.address, ) + blockchain_events.register_listener(self._blockchain_event_listener) self.last_log_block = last_block_number self.last_log_time = time.monotonic() @@ -790,6 +792,17 @@ def _synchronize_with_blockchain(self) -> None: self.alarm.register_callback(self._best_effort_synchronize) + def _blockchain_event_listener(self, events: List[DecodedEvent]) -> None: + for event in events: + args = event.event_data["args"] + if event.event_data["event"] == ChannelEvent.OPENED: + other = ( + args["participant1"] + if args["participant1"] != self.address + else args["participant2"] + ) + self.transport.health_check_web_rtc(other) + def _start_alarm_task(self) -> None: """Start the alarm task.