diff --git a/hathor/p2p/connection_slot.py b/hathor/p2p/connection_slot.py new file mode 100644 index 000000000..74cea42b5 --- /dev/null +++ b/hathor/p2p/connection_slot.py @@ -0,0 +1,177 @@ +# Copyright 2021 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections import deque +from dataclasses import dataclass +from typing import Optional + +from typing_extensions import assert_never + +from hathor.conf.settings import HathorSettings +from hathor.p2p.peer_endpoint import PeerAddress +from hathor.p2p.protocol import HathorProtocol + + +@dataclass +class ConnectionAllowed: + confirmation: str + + +@dataclass +class ConnectionChanged: + shift: str + + +@dataclass +class ConnectionRejected: + reason: str + + +ConnectionResult = ConnectionAllowed | ConnectionChanged | ConnectionRejected + + +class Slot: + """ Class of a connection pool slot - outgoing, incoming, discovered or + check_entrypoints connections. """ + connection_slot: set[HathorProtocol] + entrypoint_queue_slot: deque[PeerAddress] + type: HathorProtocol.ConnectionType + max_slot_connections: int + queue_size_entrypoints: int + entrypoint_set: set[PeerAddress | None] + + def __init__(self, type: HathorProtocol.ConnectionType, settings: HathorSettings, max_connections: int): + self.type = type + self.connection_slot = set() + self.entrypoint_queue_slot = deque() + self.entrypoint_set = set() + + if max_connections <= 0: + raise ValueError("Slot max number must allow at least one connection") + + max_outgoing: int = settings.P2P_PEER_MAX_OUTGOING_CONNECTIONS + max_incoming: int = settings.P2P_PEER_MAX_INCOMING_CONNECTIONS + max_discovered: int = settings.P2P_PEER_MAX_DISCOVERED_PEERS_CONNECTIONS + max_check_ep: int = settings.P2P_PEER_MAX_CHECK_PEER_CONNECTIONS + + type = self.type + + # For each type of slot, there is a maximum of connections allowed. + match type: + case HathorProtocol.ConnectionType.OUTGOING: + assert max_connections <= max_outgoing + + case HathorProtocol.ConnectionType.INCOMING: + assert max_connections <= max_incoming + + case HathorProtocol.ConnectionType.DISCOVERED: + assert max_connections <= max_discovered + + case HathorProtocol.ConnectionType.CHECK_ENTRYPOINTS: + assert max_connections <= max_check_ep + + case _: + assert_never(type) + + self.max_slot_connections = max_connections + # All slots have the same maximum size. + # Only valid for check_entrypoin + self.queue_size_entrypoints = settings.P2P_QUEUE_SIZE + + def add_connection(self, protocol: HathorProtocol) -> ConnectionAllowed | ConnectionChanged | ConnectionRejected: + """ + Adds connection protocol to the slot. Checks whether the slot is full or not. If full, + disconnects the protocol. If the type is 'check_entrypoints', the returns peers of it + may go to a queue. + + """ + # Make sure connection types match + assert self.type == protocol.connection_type + connection_status: ConnectionResult + + if protocol in self.connection_slot: + return ConnectionRejected("Protocol already in Slot.") + + # If check_entrypoints, there is a set. + # If set minus queue >= 1, a dequeued entrypoint in remove_connection is being connected + # We leave at least one space for it. + if len(self.entrypoint_set) > len(self.entrypoint_queue_slot): + if len(self.connection_slot) == self.max_slot_connections - 1: + protocol.disconnect(reason="Dequeued connection being added. Leaving space for it.") + return ConnectionRejected("Queue is full.") + + # Check if slot is full. If type is check_entrypoints, there is a queue. + if len(self.connection_slot) >= self.max_slot_connections: + if self.type == HathorProtocol.ConnectionType.OUTGOING: + + # The connection must be turned into CHECK_ENTRYPOINTS. + # Will return to on_peer_connect and slot it into check_entrypoints. + protocol.connection_type = HathorProtocol.ConnectionType.CHECK_ENTRYPOINTS + return ConnectionChanged("Outgoing -> Check Entrypoints") + + # Check_EP is disconnected too, as we only queue endpoints of ready/valid peers. + protocol.disconnect(reason="Connection Slot if full. Try again later.") + return ConnectionRejected(f"Slot {self.type} is full") + + # If not full, add to slot if types match. + assert protocol.connection_type == self.type + self.connection_slot.add(protocol) + + connection_status = ConnectionAllowed(f"Type {self.type} added, slot length: {len(self.connection_slot)}") + return connection_status + + def remove_connection(self, protocol: HathorProtocol, revisit: bool = False, + previous_entrypoint: PeerAddress | None = None) -> Optional[PeerAddress] | None: + """ + Removes from given instance the protocol passed. Returns protocol from queue + when disconnection leads to free space in slot. Revisit flag for continuously popping verified entrypoints + from queue and deleting previous entrypoints from set. + """ + if not revisit: + self.connection_slot.discard(protocol) + + if protocol.connection_type == HathorProtocol.ConnectionType.CHECK_ENTRYPOINTS and not revisit: + dequeued_entrypoint = None + # If protocol READY, the peer was verified. We take its EP's to the queue. + # If protocol e.p. not in set, it is a new protocol with new e.p.'s to check. + # If in set, it is a connection from a previously dequeued entrypoint. + + if protocol.connection_state != HathorProtocol.ConnectionState.READY: + return None + + if protocol.entrypoint and protocol.entrypoint.addr not in self.entrypoint_set: + entrypoints = protocol.peer.info.entrypoints + # Unpack the entrypoints and put them in the queue and the set. + for each_entrypoint in entrypoints: + if protocol.entrypoint and each_entrypoint != protocol.entrypoint.addr: + if len(self.entrypoint_queue_slot) == self.queue_size_entrypoints: + # Limit achieved for QUEUE + break + + if each_entrypoint not in self.entrypoint_queue_slot: + self.entrypoint_queue_slot.appendleft(each_entrypoint) + + if each_entrypoint not in self.entrypoint_set: + self.entrypoint_set.add(each_entrypoint) + + # If protocol not READY, it was a timeout. + # Take one from the queue and turn it into a connection. + if self.entrypoint_queue_slot: + if revisit: + self.entrypoint_set.discard(previous_entrypoint) + + dequeued_entrypoint = self.entrypoint_queue_slot.pop() + return dequeued_entrypoint + + return None diff --git a/hathor/p2p/factory.py b/hathor/p2p/factory.py index 832f2e501..c226fb949 100644 --- a/hathor/p2p/factory.py +++ b/hathor/p2p/factory.py @@ -24,7 +24,7 @@ class _HathorLineReceiverFactory(ABC, protocol.Factory): - inbound: bool + connection_type: HathorLineReceiver.ConnectionType def __init__( self, @@ -45,7 +45,7 @@ def buildProtocol(self, addr: IAddress) -> HathorLineReceiver: my_peer=self.my_peer, p2p_manager=self.p2p_manager, use_ssl=self.use_ssl, - inbound=self.inbound, + connection_type=self.connection_type, settings=self._settings ) p.factory = self @@ -55,10 +55,18 @@ def buildProtocol(self, addr: IAddress) -> HathorLineReceiver: class HathorServerFactory(_HathorLineReceiverFactory, protocol.ServerFactory): """ HathorServerFactory is used to generate HathorProtocol objects when a new connection arrives. """ - inbound = True + connection_type = HathorLineReceiver.ConnectionType.INCOMING class HathorClientFactory(_HathorLineReceiverFactory, protocol.ClientFactory): """ HathorClientFactory is used to generate HathorProtocol objects when we connected to another peer. """ - inbound = False + connection_type = HathorLineReceiver.ConnectionType.OUTGOING + + +class HathorDiscoveredFactory(_HathorLineReceiverFactory, protocol.ClientFactory): + """ + HathorDiscoveredFactory is the same as a HathorClientFactory, but the type of connection is set to + discovered, for connection pool slotting. + """ + connection_type = HathorLineReceiver.ConnectionType.DISCOVERED diff --git a/hathor/p2p/manager.py b/hathor/p2p/manager.py index 44dacdcf0..2df1cba3c 100644 --- a/hathor/p2p/manager.py +++ b/hathor/p2p/manager.py @@ -24,8 +24,10 @@ from twisted.protocols.tls import TLSMemoryBIOFactory, TLSMemoryBIOProtocol from twisted.python.failure import Failure from twisted.web.client import Agent +from typing_extensions import assert_never from hathor.conf.settings import HathorSettings +from hathor.p2p.connection_slot import ConnectionChanged, ConnectionRejected, ConnectionResult, Slot from hathor.p2p.netfilter.factory import NetfilterFactory from hathor.p2p.peer import PrivatePeer, PublicPeer, UnverifiedPeer from hathor.p2p.peer_discovery import PeerDiscovery @@ -92,6 +94,11 @@ class GlobalRateLimiter: rate_limiter: RateLimiter + outgoing_slot: Slot + incoming_slot: Slot + bootstrap_slot: Slot + check_entrypoints_slot: Slot + def __init__( self, settings: HathorSettings, @@ -115,7 +122,6 @@ def __init__( self.reactor = reactor self.my_peer = my_peer - # List of address descriptions to listen for new connections (eg: [tcp:8000]) self.listen_address_descriptions: list[str] = [] @@ -129,7 +135,7 @@ def __init__( self.localhost_only = False # Factories. - from hathor.p2p.factory import HathorClientFactory, HathorServerFactory + from hathor.p2p.factory import HathorClientFactory, HathorDiscoveredFactory, HathorServerFactory self.use_ssl = ssl self.server_factory = HathorServerFactory( self.my_peer, p2p_manager=self, use_ssl=self.use_ssl, settings=self._settings @@ -137,6 +143,9 @@ def __init__( self.client_factory = HathorClientFactory( self.my_peer, p2p_manager=self, use_ssl=self.use_ssl, settings=self._settings ) + self.discovered_factory = HathorDiscoveredFactory( + self.my_peer, p2p_manager=self, use_ssl=self.use_ssl, settings=self._settings + ) # Global maximum number of connections. self.max_connections: int = self._settings.PEER_MAX_CONNECTIONS @@ -157,6 +166,18 @@ def __init__( # List of peers connected and ready to communicate. self.connected_peers = {} + # List of connections by each slot + + max_outgoing: int = settings.P2P_PEER_MAX_OUTGOING_CONNECTIONS + max_incoming: int = settings.P2P_PEER_MAX_INCOMING_CONNECTIONS + max_discovered: int = settings.P2P_PEER_MAX_DISCOVERED_PEERS_CONNECTIONS + max_check_ep: int = settings.P2P_PEER_MAX_CHECK_PEER_CONNECTIONS + + self.outgoing_slot = Slot(HathorProtocol.ConnectionType.OUTGOING, settings, max_outgoing) + self.incoming_slot = Slot(HathorProtocol.ConnectionType.INCOMING, settings, max_incoming) + self.bootstrap_slot = Slot(HathorProtocol.ConnectionType.DISCOVERED, settings, max_discovered) + self.check_entrypoints_slot = Slot(HathorProtocol.ConnectionType.CHECK_ENTRYPOINTS, settings, max_check_ep) + # Queue of ready peer-id's used by connect_to_peer_from_connection_queue to choose the next peer to pull a # random new connection from self.new_connection_from_queue = deque() @@ -356,7 +377,7 @@ def _get_peers_count(self) -> PeerConnectionsMetrics: len(self.connecting_peers), len(self.handshaking_peers), len(self.connected_peers), - len(self.verified_peer_storage) + len(self.verified_peer_storage), ) def get_sync_factory(self, sync_version: SyncVersion) -> SyncAgentFactory: @@ -412,13 +433,47 @@ def on_connection_failure(self, failure: Failure, peer: Optional[UnverifiedPeer def on_peer_connect(self, protocol: HathorProtocol) -> None: """Called when a new connection is established.""" + + # Checks whether connections in the network are at limit. if len(self.connections) >= self.max_connections: self.log.warn('reached maximum number of connections', max_connections=self.max_connections) protocol.disconnect(force=True) return + + connection_allowed: ConnectionResult + # If protocol is added to slot, True. If to Queue or disconnected, False. + # Next block sends the connection to the appropriate slot. + conn_type = protocol.connection_type + + match conn_type: + case HathorProtocol.ConnectionType.OUTGOING: + # Here, it can happen that the protocol changes to Check Entrypoints. + connection_allowed = self.outgoing_slot.add_connection(protocol) + + # If connection changes from outgoing -> check_ep, we add it here. + if isinstance(connection_allowed, ConnectionChanged): + connection_allowed = self.check_entrypoints_slot.add_connection(protocol) + + case HathorProtocol.ConnectionType.INCOMING: + connection_allowed = self.incoming_slot.add_connection(protocol) + + case HathorProtocol.ConnectionType.DISCOVERED: + connection_allowed = self.bootstrap_slot.add_connection(protocol) + + case _: + assert_never(conn_type) + + # Regardless of the slot sent, the total connections increases. + # A connection waiting in queue is not added (yet) to the whole pool, only if another disconnects. + if isinstance(connection_allowed, ConnectionRejected): + return + self.connections.add(protocol) self.handshaking_peers.add(protocol) + # If not queued, connection state is "CONNECTING", as it is not ready yet, added to handshaking. + protocol.connection_state = HathorProtocol.ConnectionState.CONNECTING + self.pubsub.publish( HathorEvents.NETWORK_PEER_CONNECTED, protocol=protocol, @@ -429,11 +484,13 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None: """Called when a peer is ready.""" assert protocol.peer is not None self.verified_peer_storage.add_or_replace(protocol.peer) - self.handshaking_peers.remove(protocol) + for conn in self.iter_all_connections(): conn.unverified_peer_storage.remove(protocol.peer) + protocol.connection_state = HathorProtocol.ConnectionState.READY + # we emit the event even if it's a duplicate peer as a matching # NETWORK_PEER_DISCONNECTED will be emitted regardless self.pubsub.publish( @@ -470,6 +527,10 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None: # Notify other peers about this new peer connection. self.relay_peer_to_ready_connections(protocol.peer) + # If it is a connection for checking entrypoint only, we must disconnect now. + if protocol.connection_type == HathorProtocol.ConnectionType.CHECK_ENTRYPOINTS: + protocol.disconnect(reason="READY connection for check_entrypoint slot.") + def relay_peer_to_ready_connections(self, peer: PublicPeer) -> None: """Relay peer to all ready connections.""" for conn in self.iter_ready_connections(): @@ -480,9 +541,40 @@ def relay_peer_to_ready_connections(self, peer: PublicPeer) -> None: def on_peer_disconnect(self, protocol: HathorProtocol) -> None: """Called when a peer disconnect.""" + + # Discard handles case when not in connections. self.connections.discard(protocol) + + # If the protocol discarded is from check_entrypoints slot. + dequeued_ep = None + + # Each conn is from a slot - discard from it as well. + match protocol.connection_type: + case HathorProtocol.ConnectionType.OUTGOING: + self.outgoing_slot.remove_connection(protocol) + + case HathorProtocol.ConnectionType.INCOMING: + self.incoming_slot.remove_connection(protocol) + + case HathorProtocol.ConnectionType.DISCOVERED: + self.bootstrap_slot.remove_connection(protocol) + + case HathorProtocol.ConnectionType.CHECK_ENTRYPOINTS: + dequeued_ep = self.check_entrypoints_slot.remove_connection(protocol) + # For a given ep, check if some verified peer has it. If so, pop it off and restart. + while dequeued_ep: + for peer in self.verified_peer_storage.values(): + if dequeued_ep in peer.info.entrypoints: + dequeued_ep = self.check_entrypoints_slot.remove_connection(protocol, True, dequeued_ep) + break + if dequeued_ep and dequeued_ep not in peer.info.entrypoints: + self.connect_to_endpoint(entrypoint=dequeued_ep.with_id(None)) + case _: + assert_never() + if protocol in self.handshaking_peers: self.handshaking_peers.remove(protocol) + if protocol._peer is not None: peer_id = protocol.peer.id existing_protocol = self.connected_peers.pop(peer_id, None) @@ -499,21 +591,24 @@ def on_peer_disconnect(self, protocol: HathorProtocol) -> None: elif peer_id in self.new_connection_from_queue: # now we're sure it can be removed from new_connection_from_queue self.new_connection_from_queue.remove(peer_id) + self.pubsub.publish( HathorEvents.NETWORK_PEER_DISCONNECTED, protocol=protocol, peers_count=self._get_peers_count() ) + # SUGGESTION: To make a NETWORK_PEER_DEQUEUED. It would be clearer, since in the case of a dequeue, + # the order of events would be "NETWORK_PEER_READY" and then "NETWORK_PEER_CONNECTED", which is + # the opposite. + def iter_all_connections(self) -> Iterable[HathorProtocol]: """Iterate over all connections.""" - for conn in self.connections: - yield conn + yield from self.connections def iter_ready_connections(self) -> Iterable[HathorProtocol]: """Iterate over ready connections.""" - for conn in self.connected_peers.values(): - yield conn + yield from self.connected_peers.values() def iter_not_ready_endpoints(self) -> Iterable[PeerEndpoint]: """Iterate over not-ready connections.""" @@ -675,13 +770,18 @@ def _connect_to_callback( peer: UnverifiedPeer | PublicPeer | None, endpoint: IStreamClientEndpoint, entrypoint: PeerEndpoint, + discovery_call: bool = False ) -> None: """Called when we successfully connect to a peer.""" if isinstance(protocol, HathorProtocol): + if discovery_call: + protocol.connection_type = HathorProtocol.ConnectionType.DISCOVERED protocol.on_outbound_connect(entrypoint, peer) else: assert isinstance(protocol, TLSMemoryBIOProtocol) assert isinstance(protocol.wrappedProtocol, HathorProtocol) + if discovery_call: + protocol.wrappedProtocol.connection_type = HathorProtocol.ConnectionType.DISCOVERED protocol.wrappedProtocol.on_outbound_connect(entrypoint, peer) self.connecting_peers.pop(endpoint) @@ -690,6 +790,7 @@ def connect_to_endpoint( entrypoint: PeerEndpoint, peer: UnverifiedPeer | PublicPeer | None = None, use_ssl: bool | None = None, + discovery_call: bool = False ) -> None: """ Attempt to connect directly to an endpoint, prefer calling `connect_to_peer` when possible. @@ -699,6 +800,7 @@ def connect_to_endpoint( If `use_ssl` is True, then the connection will be wraped by a TLS. """ + if entrypoint.peer_id is not None and peer is not None and entrypoint.peer_id != peer.id: self.log.debug('skipping because the entrypoint peer_id does not match the actual peer_id', entrypoint=str(entrypoint)) @@ -730,10 +832,16 @@ def connect_to_endpoint( endpoint = entrypoint.addr.to_client_endpoint(self.reactor) factory: IProtocolFactory - if use_ssl: - factory = TLSMemoryBIOFactory(self.my_peer.certificate_options, True, self.client_factory) + if discovery_call: + if use_ssl: + factory = TLSMemoryBIOFactory(self.my_peer.certificate_options, True, self.discovered_factory) + else: + factory = self.discovered_factory else: - factory = self.client_factory + if use_ssl: + factory = TLSMemoryBIOFactory(self.my_peer.certificate_options, True, self.client_factory) + else: + factory = self.client_factory if peer is not None: now = int(self.reactor.seconds()) @@ -742,7 +850,7 @@ def connect_to_endpoint( deferred = endpoint.connect(factory) self.connecting_peers[endpoint] = _ConnectingPeer(entrypoint, deferred) - deferred.addCallback(self._connect_to_callback, peer, endpoint, entrypoint) + deferred.addCallback(self._connect_to_callback, peer, endpoint, entrypoint, discovery_call) deferred.addErrback(self.on_connection_failure, peer, endpoint) self.log.info('connecting to', entrypoint=str(entrypoint), peer=str(peer)) self.pubsub.publish( @@ -819,9 +927,15 @@ def get_connection_to_drop(self, protocol: HathorProtocol) -> HathorProtocol: assert protocol.peer.id is not None assert protocol.my_peer.id is not None other_connection = self.connected_peers[protocol.peer.id] + _outbound_types = ( + HathorProtocol.ConnectionType.OUTGOING, + HathorProtocol.ConnectionType.DISCOVERED, + HathorProtocol.ConnectionType.CHECK_ENTRYPOINTS, + ) + is_outbound = protocol.connection_type in _outbound_types if bytes(protocol.my_peer.id) > bytes(protocol.peer.id): # connection started by me is kept - if not protocol.inbound: + if is_outbound: # other connection is dropped return other_connection else: @@ -829,7 +943,7 @@ def get_connection_to_drop(self, protocol: HathorProtocol) -> HathorProtocol: return protocol else: # connection started by peer is kept - if not protocol.inbound: + if is_outbound: return protocol else: return other_connection diff --git a/hathor/p2p/peer_discovery/bootstrap.py b/hathor/p2p/peer_discovery/bootstrap.py index 23399e2ed..f9b6b302b 100644 --- a/hathor/p2p/peer_discovery/bootstrap.py +++ b/hathor/p2p/peer_discovery/bootstrap.py @@ -37,6 +37,6 @@ def __init__(self, entrypoints: list[PeerEndpoint]): self.entrypoints = entrypoints @override - async def discover_and_connect(self, connect_to_endpoint: Callable[[PeerEndpoint], None]) -> None: + async def discover_and_connect(self, connect_to_endpoint: Callable[..., None]) -> None: for entrypoint in self.entrypoints: - connect_to_endpoint(entrypoint) + connect_to_endpoint(entrypoint, discovery_call=True) diff --git a/hathor/p2p/peer_discovery/dns.py b/hathor/p2p/peer_discovery/dns.py index 9ef792a96..f00d4ddfc 100644 --- a/hathor/p2p/peer_discovery/dns.py +++ b/hathor/p2p/peer_discovery/dns.py @@ -53,13 +53,13 @@ def do_lookup_text(self, host: str) -> Deferred[LookupResult]: return lookupText(host) @override - async def discover_and_connect(self, connect_to_endpoint: Callable[[PeerEndpoint], None]) -> None: + async def discover_and_connect(self, connect_to_endpoint: Callable[..., None]) -> None: """ Run DNS lookup for host and connect to it This is executed when starting the DNS Peer Discovery and first connecting to the network """ for host in self.hosts: for entrypoint in (await self.dns_seed_lookup(host)): - connect_to_endpoint(entrypoint) + connect_to_endpoint(entrypoint, discovery_call=True) async def dns_seed_lookup(self, host: str) -> set[PeerEndpoint]: """ Run a DNS lookup for TXT, A, and AAAA records and return a list of connection strings. diff --git a/hathor/p2p/peer_discovery/peer_discovery.py b/hathor/p2p/peer_discovery/peer_discovery.py index ae8ee626b..50a2ccc57 100644 --- a/hathor/p2p/peer_discovery/peer_discovery.py +++ b/hathor/p2p/peer_discovery/peer_discovery.py @@ -15,15 +15,13 @@ from abc import ABC, abstractmethod from typing import Callable -from hathor.p2p.peer_endpoint import PeerEndpoint - class PeerDiscovery(ABC): """ Base class to implement peer discovery strategies. """ @abstractmethod - async def discover_and_connect(self, connect_to_endpoint: Callable[[PeerEndpoint], None]) -> None: + async def discover_and_connect(self, connect_to_endpoint: Callable[..., None]) -> None: """ This method must discover the peers and call `connect_to_endpoint` for each of them. :param connect_to_endpoint: Function which will be called for each discovered peer. diff --git a/hathor/p2p/protocol.py b/hathor/p2p/protocol.py index 15cf4e8a5..eaf6832c3 100644 --- a/hathor/p2p/protocol.py +++ b/hathor/p2p/protocol.py @@ -72,6 +72,19 @@ class PeerState(Enum): PEER_ID = PeerIdState READY = ReadyState + class ConnectionType(Enum): + """ Types of Connection as inputs for an instance of the Hathor Protocol. """ + OUTGOING = 0 + INCOMING = 1 + DISCOVERED = 2 + CHECK_ENTRYPOINTS = 3 + + class ConnectionState(Enum): + """ State of connection of two peers - either in a slot queue or active. """ + CREATED = 0 + CONNECTING = 1 + READY = 2 + class RateLimitKeys(str, Enum): GLOBAL = 'global' @@ -95,6 +108,7 @@ class WarningFlags(str, Enum): idle_timeout: int sync_version: Optional[SyncVersion] # version chosen to be used on this connection capabilities: set[str] # capabilities received from the peer in HelloState + connection_state: ConnectionState # in slot queue, connecting or ready/in-slot. @property def peer(self) -> PublicPeer: @@ -108,7 +122,7 @@ def __init__( *, settings: HathorSettings, use_ssl: bool, - inbound: bool, + connection_type: ConnectionType, ) -> None: self._settings = settings self.my_peer = my_peer @@ -120,8 +134,12 @@ def __init__( assert self.connections.reactor is not None self.reactor = self.connections.reactor - # Indicate whether it is an inbound connection (true) or an outbound connection (false). - self.inbound = inbound + # Type of Connection + # 0 == Outgoing, 1 == Incoming, 2 == Discovered, 3 == For Checking Entrypoints. + self.connection_type = connection_type + + # Connection State + self.connection_state = HathorProtocol.ConnectionState.CREATED # Maximum period without receiving any messages. self.idle_timeout = self._settings.PEER_IDLE_TIMEOUT diff --git a/hathor_tests/others/test_metrics.py b/hathor_tests/others/test_metrics.py index 8c0e0e1b2..a6eca3164 100644 --- a/hathor_tests/others/test_metrics.py +++ b/hathor_tests/others/test_metrics.py @@ -209,7 +209,7 @@ def build_hathor_protocol(): my_peer=my_peer, p2p_manager=manager.connections, use_ssl=False, - inbound=False, + connection_type=HathorProtocol.ConnectionType.OUTGOING, settings=self._settings ) protocol._peer = PrivatePeer.auto_generated().to_public_peer() diff --git a/hathor_tests/p2p/test_sync_v2.py b/hathor_tests/p2p/test_sync_v2.py index 4cf23dd86..393bc035e 100644 --- a/hathor_tests/p2p/test_sync_v2.py +++ b/hathor_tests/p2p/test_sync_v2.py @@ -6,8 +6,11 @@ from twisted.internet.defer import Deferred, succeed from twisted.python.failure import Failure +from hathor.conf.settings import HathorSettings +from hathor.p2p.connection_slot import Slot from hathor.p2p.messages import ProtocolMessages from hathor.p2p.peer import PrivatePeer +from hathor.p2p.protocol import HathorProtocol from hathor.p2p.states import ReadyState from hathor.p2p.sync_v2.agent import NodeBlockSync, _HeightInfo from hathor.p2p.sync_v2.blockchain_streaming_client import BlockchainStreamingClient @@ -471,3 +474,260 @@ def test_multiple_unexpected_txs(self) -> None: # force the processing of async code, nothing should break self.simulator.run(0) + + def test_update_of_slots(self) -> None: + + """ + Tests whether the slot mechanism is updating with each extra connection. + """ + # Number of peers to connect + max_total_peers = 5 + + # Create peer list + peerList = [] + for _ in range(max_total_peers): + peerList.append(self.create_peer()) + + # Generate incoming connections - peerList[0] is the target. + in_connList = [] + for i in range(1, max_total_peers): + in_connList.append(FakeConnection(peerList[0], peerList[i])) + + for i in range(len(in_connList)): + self.simulator.add_connection(in_connList[i]) + + self.simulator.run(15) + + # Checks whether it is updating incoming slot + self.assertTrue(len(peerList[0].connections.incoming_slot.connection_slot) == len(in_connList)) + + # Generate outgoing_connections - we'll make a new peer to be the outgoing reference. + # Add new peer: + + newPeer = self.create_peer() + out_connList = [] + for i in range(max_total_peers): + out_connList.append(FakeConnection(peerList[i], newPeer)) + + for i in range(len(out_connList)): + self.simulator.add_connection(out_connList[i]) + + self.simulator.run(15) + + # Checks whether it is updating outgoing_slot + self.assertTrue(len(newPeer.connections.outgoing_slot.connection_slot) == len(out_connList)) + + def test_slot_limit(self) -> None: + """ Tests whether the slots and the pool stop increasing connections after cap is reached. + + Important note: create_peer caps the peer pool at 100. Hence, if more than 100 connections + are opened (if settings.P2P_..._OUTGOING + settings.P2P_...INCOMING _+ ... exceed 100) + then the tests will fail.""" + + # Full-Node: May receive incoming connections, deliver outgoing connections, etc. + # If the total amount of connections exceeds 100, create_peer will not yield more than 100 peers as well. + full_node = self.create_peer() + + # Set the limits for each slot: + max_outgoing_connections = 20 + max_incoming_connections = 20 + max_check_entrypoints = 5 + max_connections = max_check_entrypoints + max_incoming_connections + max_outgoing_connections + + # Change all max_slot values so they do not exceed 100 (limit of create_peer.) + outgoing_slot = full_node.connections.outgoing_slot + outgoing_slot.max_slot_connections = max_outgoing_connections + + incoming_slot = full_node.connections.incoming_slot + incoming_slot.max_slot_connections = max_incoming_connections + + check_ep_slot = full_node.connections.check_entrypoints_slot + check_ep_slot.max_slot_connections = max_check_entrypoints + + # -- Check connections slot -- # + # For each connection type we add more peers than necessary to see if the slot limits function. + # Create peer list for incoming connections + in_peerList = [] + for _ in range(max_incoming_connections + 5): + in_peerList.append(self.create_peer()) + + # Generate incoming connections - full_node is the target. + in_connList = [] + for i in range(0, max_incoming_connections + 5): + in_connList.append(FakeConnection(full_node, in_peerList[i])) + + for i in range(len(in_connList)): + self.simulator.add_connection(in_connList[i]) + + self.simulator.run(10) + + number_incoming_slot = len(incoming_slot.connection_slot) + # Checks whether the connection has capped on its limit size. + self.assertTrue(number_incoming_slot == max_incoming_connections) + + # -- Check outgoing connections slot -- # + # Create peer list for outgoing connections + out_peerList = [] + for _ in range(max_outgoing_connections + 5): + out_peerList.append(self.create_peer()) + + # Generate outgoing connections - out_peerList[i] is the target. + out_connList = [] + for i in range(0, max_outgoing_connections + 5): + out_connList.append(FakeConnection(out_peerList[i], full_node)) + + for i in range(len(out_connList)): + self.simulator.add_connection(out_connList[i]) + + # Assure the outgoing connections cap at the threshold. + self.simulator.run(10) + number_outgoing_slot = len(outgoing_slot.connection_slot) + self.assertTrue(number_outgoing_slot == max_outgoing_connections) + + # Finally, assure the number of connected peers is the same as the sum of all (no discovered connections). + connection_pool = full_node.connections.connections + number_check_ep = len(check_ep_slot.connection_slot) + self.assertTrue(number_outgoing_slot + number_incoming_slot + number_check_ep == len(connection_pool)) + self.assertTrue(len(connection_pool) <= max_connections) + + def test_check_ep_update(self) -> None: + """ + Checks whether the check_entrypoints slot gets updated after outgoing slot full. + """ + + # Full-Node: May receive incoming connections, deliver outgoing connections, etc. + # If the total amount of connections exceeds 100, create_peer will not yield more than 100 peers as well. + full_node = self.create_peer() + + # Set the limits for each slot: + max_outgoing_connections = 20 + max_incoming_connections = 20 + max_check_entrypoints = 5 + + # Change all max_slot values so they do not exceed 100 (limit of create_peer.) + outgoing_slot = full_node.connections.outgoing_slot + outgoing_slot.max_slot_connections = max_outgoing_connections + + incoming_slot = full_node.connections.incoming_slot + incoming_slot.max_slot_connections = max_incoming_connections + + check_ep_slot = full_node.connections.check_entrypoints_slot + check_ep_slot.max_slot_connections = max_check_entrypoints + + out_peerList = [] + for _ in range(max_outgoing_connections): + out_peerList.append(self.create_peer()) + + # Generate outgoing connections - out_peerList[i] is the target. + out_connList = [] + for i in range(0, max_outgoing_connections): + out_connList.append(FakeConnection(out_peerList[i], full_node)) + + for i in range(len(out_connList)): + self.simulator.add_connection(out_connList[i]) + + # Assure the outgoing connections cap at the threshold. + self.simulator.run(10) + + # Now, increase in one more connection, and see if the protocol is check_entrypoints type. + new_peer = self.create_peer() + out_peerList.append(new_peer) + conn = FakeConnection(new_peer, full_node) + out_connList.append(conn) + self.simulator.add_connection(conn) + + self.simulator.run(2) + + # Check if indeed a connection was updated into check_entrypoints after outgoing full + self.assertTrue(len(check_ep_slot.connection_slot) == 1) + + # Let's keep adding more outgoing connections to the full node until it caps the check_entrypoints. + for _ in range(max_check_entrypoints + 5): + out_peerList.append(self.create_peer()) + + # Generate outgoing connections - out_peerList[i] is the target. + out_connList = [] + for i in range(max_check_entrypoints + 5): + out_connList.append(FakeConnection(out_peerList[i], full_node)) + + for i in range(len(out_connList)): + self.simulator.add_connection(out_connList[i]) + + self.simulator.run(2) + + # Amount of established connections in check_ep slot. + amount_check_ep_conn = len(check_ep_slot.connection_slot) + + # It passed through the cap of check_entrypoints. It mush be capped. + self.assertTrue(amount_check_ep_conn == max_check_entrypoints) + + # Assert the numbers add up to the max of connections. + total_conn = len(full_node.connections.connections) + self.assertTrue(amount_check_ep_conn + max_outgoing_connections == total_conn) + + def test_check_ep_overflow(self) -> None: + # Full-Node: May receive incoming connections, deliver outgoing connections, etc. + # If the total amount of connections exceeds 100, create_peer will not yield more than 100 peers as well. + full_node = self.create_peer() + + # Set the limits for each slot: + max_outgoing_connections = 20 + max_incoming_connections = 20 + max_check_entrypoints = 5 + + # Change all max_slot values so they do not exceed 100 (limit of create_peer.) + outgoing_slot = full_node.connections.outgoing_slot + outgoing_slot.max_slot_connections = max_outgoing_connections + + incoming_slot = full_node.connections.incoming_slot + incoming_slot.max_slot_connections = max_incoming_connections + + check_ep_slot = full_node.connections.check_entrypoints_slot + check_ep_slot.max_slot_connections = max_check_entrypoints + out_peerList = [] + for _ in range(max_outgoing_connections): + out_peerList.append(self.create_peer()) + + # Generate outgoing connections - out_peerList[i] is the target. + out_connList = [] + for i in range(0, max_outgoing_connections): + out_connList.append(FakeConnection(out_peerList[i], full_node)) + + for i in range(len(out_connList)): + if out_connList[i] not in self.simulator._connections: + self.simulator.add_connection(out_connList[i]) + + # Assure the outgoing connections cap at the threshold. + self.simulator.run(10) + + # Let's keep adding more outgoing connections to the full node until it caps the check_entrypoints. + for _ in range(max_check_entrypoints): + out_peerList.append(self.create_peer()) + + # Generate outgoing connections - out_peerList[i] is the target. + out_connList = [] + for i in range(max_check_entrypoints): + out_connList.append(FakeConnection(out_peerList[i], full_node)) + + for i in range(len(out_connList)): + if out_connList[i] not in self.simulator._connections: + self.simulator.add_connection(out_connList[i]) + + self.simulator.run(2) + + # Amount of established connections in check_ep slot. + amount_check_ep_conn = len(check_ep_slot.connection_slot) + + self.simulator.run(30) + + for conn in full_node.connections.check_entrypoints_slot.connection_slot: + self.assertTrue(conn.connection_state == HathorProtocol.ConnectionState.CONNECTING) + + # It passed through the cap of check_entrypoints. It mush be capped. + self.assertTrue(amount_check_ep_conn == max_check_entrypoints) + + def test_example_usage_of_Slot(self) -> None: + _settings = HathorSettings(P2PKH_VERSION_BYTE=bytes(1), MULTISIG_VERSION_BYTE=bytes(1), NETWORK_NAME="testnet") + max_connections = 15 + slot = Slot(HathorProtocol.ConnectionType.OUTGOING, _settings, max_connections=max_connections) + assert slot.max_slot_connections == max_connections diff --git a/hathorlib/hathorlib/conf/settings.py b/hathorlib/hathorlib/conf/settings.py index cb8829247..8a8db77f1 100644 --- a/hathorlib/hathorlib/conf/settings.py +++ b/hathorlib/hathorlib/conf/settings.py @@ -366,7 +366,16 @@ def _validate_token_deposit_percentage(cls, token_deposit_percentage: float) -> PEER_CONNECTION_RETRY_MAX_RETRY_INTERVAL: int = 300 # Number max of connections in the p2p network - PEER_MAX_CONNECTIONS: int = 125 + PEER_MAX_CONNECTIONS: int = 85 + + # Max Number of each connection slot (int): + P2P_PEER_MAX_INCOMING_CONNECTIONS: int = 50 + P2P_PEER_MAX_OUTGOING_CONNECTIONS: int = 60 + P2P_PEER_MAX_DISCOVERED_PEERS_CONNECTIONS: int = 10 + P2P_PEER_MAX_CHECK_PEER_CONNECTIONS: int = 5 + + # Queue size for each connection slot: + P2P_QUEUE_SIZE: int = 100 # Maximum period without receiving any messages from ther peer (in seconds). PEER_IDLE_TIMEOUT: int = 60