diff --git a/src/lean_spec/subspecs/containers/state/state.py b/src/lean_spec/subspecs/containers/state/state.py index 1e41e845..4b537759 100644 --- a/src/lean_spec/subspecs/containers/state/state.py +++ b/src/lean_spec/subspecs/containers/state/state.py @@ -1,5 +1,7 @@ """State Container for the Lean Ethereum consensus specification.""" +from __future__ import annotations + from typing import AbstractSet, Iterable from lean_spec.subspecs.ssz.hash import hash_tree_root diff --git a/src/lean_spec/subspecs/metrics/__init__.py b/src/lean_spec/subspecs/metrics/__init__.py index c6f23172..36803521 100644 --- a/src/lean_spec/subspecs/metrics/__init__.py +++ b/src/lean_spec/subspecs/metrics/__init__.py @@ -7,38 +7,26 @@ from .registry import ( REGISTRY, - attestations_invalid, attestations_produced, - attestations_received, - attestations_valid, block_processing_time, blocks_processed, blocks_proposed, - current_slot, finalized_slot, generate_metrics, head_slot, justified_slot, - peers_connected, - reorgs, validators_count, ) __all__ = [ "REGISTRY", - "attestations_invalid", "attestations_produced", - "attestations_received", - "attestations_valid", "block_processing_time", "blocks_processed", "blocks_proposed", - "current_slot", "finalized_slot", "generate_metrics", "head_slot", "justified_slot", - "peers_connected", - "reorgs", "validators_count", ] diff --git a/src/lean_spec/subspecs/metrics/registry.py b/src/lean_spec/subspecs/metrics/registry.py index 7996270e..59cea842 100644 --- a/src/lean_spec/subspecs/metrics/registry.py +++ b/src/lean_spec/subspecs/metrics/registry.py @@ -30,12 +30,6 @@ registry=REGISTRY, ) -current_slot = Gauge( - "lean_current_slot", - "Current time slot", - registry=REGISTRY, -) - justified_slot = Gauge( "lean_justified_slot", "Latest justified slot", @@ -71,48 +65,6 @@ registry=REGISTRY, ) -# ----------------------------------------------------------------------------- -# Attestations -# ----------------------------------------------------------------------------- - -attestations_received = Counter( - "lean_attestations_received_total", - "Total attestations received", - registry=REGISTRY, -) - -attestations_valid = Counter( - "lean_attestations_valid_total", - "Valid attestations", - registry=REGISTRY, -) - -attestations_invalid = Counter( - "lean_attestations_invalid_total", - "Invalid attestations", - registry=REGISTRY, -) - -# ----------------------------------------------------------------------------- -# Network -# ----------------------------------------------------------------------------- - -peers_connected = Gauge( - "lean_peers_connected", - "Connected peers", - registry=REGISTRY, -) - -# ----------------------------------------------------------------------------- -# Consensus Events -# ----------------------------------------------------------------------------- - -reorgs = Counter( - "lean_reorgs_total", - "Chain reorganizations", - registry=REGISTRY, -) - # ----------------------------------------------------------------------------- # Validator Production # ----------------------------------------------------------------------------- diff --git a/src/lean_spec/subspecs/networking/discovery/__init__.py b/src/lean_spec/subspecs/networking/discovery/__init__.py index 7da25f85..0b1bfbd8 100644 --- a/src/lean_spec/subspecs/networking/discovery/__init__.py +++ b/src/lean_spec/subspecs/networking/discovery/__init__.py @@ -18,46 +18,10 @@ from .codec import ( DiscoveryMessage, - MessageDecodingError, - MessageEncodingError, decode_message, encode_message, - generate_request_id, ) from .config import DiscoveryConfig -from .crypto import ( - AES_KEY_SIZE, - COMPRESSED_PUBKEY_SIZE, - CTR_IV_SIZE, - GCM_NONCE_SIZE, - GCM_TAG_SIZE, - ID_SIGNATURE_SIZE, - UNCOMPRESSED_PUBKEY_SIZE, - aes_ctr_decrypt, - aes_ctr_encrypt, - aes_gcm_decrypt, - aes_gcm_encrypt, - ecdh_agree, - generate_secp256k1_keypair, - pubkey_to_compressed, - pubkey_to_uncompressed, - sign_id_nonce, - verify_id_nonce_signature, -) -from .handshake import ( - HandshakeError, - HandshakeManager, - HandshakeResult, - HandshakeState, - PendingHandshake, -) -from .keys import ( - DISCV5_KEY_AGREEMENT_INFO, - SESSION_KEY_SIZE, - compute_node_id, - derive_keys, - derive_keys_from_pubkey, -) from .messages import ( MAX_REQUEST_ID_LENGTH, PROTOCOL_ID, @@ -65,145 +29,45 @@ Distance, FindNode, IdNonce, - IPv4, - IPv6, MessageType, Nodes, Nonce, - PacketFlag, Ping, Pong, - Port, RequestId, - StaticHeader, TalkReq, TalkResp, - WhoAreYouAuthdata, -) -from .packet import ( - HANDSHAKE_HEADER_SIZE, - MESSAGE_AUTHDATA_SIZE, - STATIC_HEADER_SIZE, - WHOAREYOU_AUTHDATA_SIZE, - HandshakeAuthdata, - MessageAuthdata, - PacketHeader, - PacketType, - decode_handshake_authdata, - decode_message_authdata, - decode_packet_header, - decode_whoareyou_authdata, - decrypt_message, - encode_handshake_authdata, - encode_message_authdata, - encode_packet, - encode_whoareyou_authdata, - generate_id_nonce, - generate_nonce, -) -from .packet import ( - WhoAreYouAuthdata as WhoAreYouAuthdataDecoded, ) -from .routing import KBucket, NodeEntry, RoutingTable, log2_distance, xor_distance +from .routing import NodeEntry, RoutingTable from .service import DiscoveryService, LookupResult -from .session import BondCache, Session, SessionCache -from .transport import DiscoveryTransport __all__ = [ - # Config + # High-level service + "DiscoveryService", "DiscoveryConfig", - # Messages - "MAX_REQUEST_ID_LENGTH", + "LookupResult", + # Message types (for protocol interaction) + "DiscoveryMessage", + "encode_message", + "decode_message", + # Routing + "NodeEntry", + "RoutingTable", + # Message types (commonly needed) + "Ping", + "Pong", + "FindNode", + "Nodes", + "TalkReq", + "TalkResp", + # Constants (commonly needed) "PROTOCOL_ID", "PROTOCOL_VERSION", + "MAX_REQUEST_ID_LENGTH", + # Types "Distance", "IdNonce", - "IPv4", - "IPv6", "Nonce", - "Port", "RequestId", "MessageType", - "PacketFlag", - "FindNode", - "Nodes", - "Ping", - "Pong", - "TalkReq", - "TalkResp", - "StaticHeader", - "WhoAreYouAuthdata", - # Routing - "KBucket", - "NodeEntry", - "RoutingTable", - "log2_distance", - "xor_distance", - # Crypto - "AES_KEY_SIZE", - "COMPRESSED_PUBKEY_SIZE", - "CTR_IV_SIZE", - "GCM_NONCE_SIZE", - "GCM_TAG_SIZE", - "ID_SIGNATURE_SIZE", - "UNCOMPRESSED_PUBKEY_SIZE", - "aes_ctr_encrypt", - "aes_ctr_decrypt", - "aes_gcm_encrypt", - "aes_gcm_decrypt", - "ecdh_agree", - "generate_secp256k1_keypair", - "pubkey_to_compressed", - "pubkey_to_uncompressed", - "sign_id_nonce", - "verify_id_nonce_signature", - # Keys - "DISCV5_KEY_AGREEMENT_INFO", - "SESSION_KEY_SIZE", - "compute_node_id", - "derive_keys", - "derive_keys_from_pubkey", - # Codec - "DiscoveryMessage", - "MessageDecodingError", - "MessageEncodingError", - "encode_message", - "decode_message", - "generate_request_id", - # Packet - "STATIC_HEADER_SIZE", - "MESSAGE_AUTHDATA_SIZE", - "WHOAREYOU_AUTHDATA_SIZE", - "HANDSHAKE_HEADER_SIZE", - "PacketType", - "PacketHeader", - "MessageAuthdata", - "WhoAreYouAuthdataDecoded", - "HandshakeAuthdata", - "encode_packet", - "decode_packet_header", - "encode_message_authdata", - "decode_message_authdata", - "encode_whoareyou_authdata", - "decode_whoareyou_authdata", - "encode_handshake_authdata", - "decode_handshake_authdata", - "decrypt_message", - "generate_nonce", - "generate_id_nonce", - # Session - "Session", - "SessionCache", - "BondCache", - # Handshake - "HandshakeState", - "PendingHandshake", - "HandshakeResult", - "HandshakeError", - "HandshakeManager", - # Transport - "DiscoveryTransport", - # Service - "DiscoveryService", - "LookupResult", ] diff --git a/src/lean_spec/subspecs/networking/gossipsub/__init__.py b/src/lean_spec/subspecs/networking/gossipsub/__init__.py index ca8c368b..21f0a78e 100644 --- a/src/lean_spec/subspecs/networking/gossipsub/__init__.py +++ b/src/lean_spec/subspecs/networking/gossipsub/__init__.py @@ -22,14 +22,9 @@ - Ethereum P2P: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md """ -from lean_spec.subspecs.networking.varint import decode_varint, encode_varint - from ..transport import PeerId from .behavior import ( GossipsubBehavior, - GossipsubMessageEvent, - GossipsubPeerEvent, - PeerState, ) from .control import ( ControlMessage, @@ -39,57 +34,11 @@ IWant, Prune, ) -from .mcache import ( - CacheEntry, - MessageCache, - SeenCache, -) -from .mesh import ( - FanoutEntry, - MeshState, - TopicMesh, -) -from .message import GossipsubMessage, SnappyDecompressor +from .message import GossipsubMessage from .parameters import ( GossipsubParameters, ) -from .rpc import ( - RPC, - PeerInfo, - SubOpts, - create_graft_rpc, - create_ihave_rpc, - create_iwant_rpc, - create_prune_rpc, - create_publish_rpc, - create_subscription_rpc, -) -from .rpc import ( - ControlGraft as RPCControlGraft, -) -from .rpc import ( - ControlIDontWant as RPCControlIDontWant, -) -from .rpc import ( - ControlIHave as RPCControlIHave, -) -from .rpc import ( - ControlIWant as RPCControlIWant, -) -from .rpc import ( - ControlMessage as RPCControlMessage, -) -from .rpc import ( - ControlPrune as RPCControlPrune, -) -from .rpc import ( - Message as RPCMessage, -) from .topic import ( - ATTESTATION_TOPIC_NAME, - BLOCK_TOPIC_NAME, - ENCODING_POSTFIX, - TOPIC_PREFIX, ForkMismatchError, GossipTopic, TopicKind, @@ -102,62 +51,26 @@ ) __all__ = [ - # Behavior + # Behavior (main entry point) "GossipsubBehavior", - "GossipsubMessageEvent", - "GossipsubPeerEvent", - "PeerState", + "GossipsubParameters", # Message "GossipsubMessage", - "SnappyDecompressor", - # Topic + # Topic (commonly needed for Ethereum) "GossipTopic", "TopicKind", - "TOPIC_PREFIX", - "ENCODING_POSTFIX", - "BLOCK_TOPIC_NAME", - "ATTESTATION_TOPIC_NAME", - "ForkMismatchError", "format_topic_string", "parse_topic_string", - # Parameters - "GossipsubParameters", - # Control + "ForkMismatchError", + # Types + "MessageId", + "TopicId", + "PeerId", + # Control messages (for custom handlers) "ControlMessage", "Graft", "Prune", "IHave", "IWant", "IDontWant", - # RPC (wire protocol encoding) - "RPC", - "SubOpts", - "RPCMessage", - "RPCControlMessage", - "RPCControlGraft", - "RPCControlPrune", - "RPCControlIHave", - "RPCControlIWant", - "RPCControlIDontWant", - "PeerInfo", - "create_subscription_rpc", - "create_graft_rpc", - "create_prune_rpc", - "create_ihave_rpc", - "create_iwant_rpc", - "create_publish_rpc", - "encode_varint", - "decode_varint", - # Mesh - "MeshState", - "TopicMesh", - "FanoutEntry", - # Cache - "MessageCache", - "SeenCache", - "CacheEntry", - # Types - "MessageId", - "PeerId", - "TopicId", ] diff --git a/src/lean_spec/subspecs/networking/gossipsub/behavior.py b/src/lean_spec/subspecs/networking/gossipsub/behavior.py index b9437c78..f08a5869 100644 --- a/src/lean_spec/subspecs/networking/gossipsub/behavior.py +++ b/src/lean_spec/subspecs/networking/gossipsub/behavior.py @@ -56,7 +56,9 @@ from __future__ import annotations import asyncio +import hashlib import logging +import struct import time from collections.abc import Callable from dataclasses import dataclass, field @@ -65,25 +67,16 @@ from lean_spec.subspecs.networking.config import ( MESSAGE_DOMAIN_VALID_SNAPPY, MESSAGE_ID_SIZE, - PRUNE_BACKOFF, ) from lean_spec.subspecs.networking.gossipsub.mcache import MessageCache, SeenCache from lean_spec.subspecs.networking.gossipsub.mesh import MeshState from lean_spec.subspecs.networking.gossipsub.parameters import GossipsubParameters from lean_spec.subspecs.networking.gossipsub.rpc import ( RPC, - ControlGraft, - ControlIHave, - ControlIWant, - ControlMessage, - ControlPrune, Message, - SubOpts, - create_graft_rpc, create_subscription_rpc, ) from lean_spec.subspecs.networking.transport import PeerId -from lean_spec.subspecs.networking.varint import decode_varint, encode_varint from lean_spec.types import Bytes20 if TYPE_CHECKING: @@ -162,29 +155,6 @@ class GossipsubBehavior: - Message propagation - Lazy gossip (IHAVE/IWANT) - Heartbeat-based maintenance - - Usage:: - - behavior = GossipsubBehavior(params=GossipsubParameters()) - await behavior.start() - - # Subscribe to topics - behavior.subscribe("/leanconsensus/0x12345678/block/ssz_snappy") - - # Add peer connection - await behavior.add_peer(peer_id, stream) - - # Publish message - await behavior.publish(topic, data) - - # Process events - while True: - event = await behavior.get_next_event() - if event is None: - break - if isinstance(event, GossipsubMessageEvent): - # Handle received message - pass """ params: GossipsubParameters = field(default_factory=GossipsubParameters) @@ -243,6 +213,8 @@ def subscribe(self, topic: str) -> None: Args: topic: Topic string to subscribe to. """ + from lean_spec.subspecs.networking.gossipsub.stream import broadcast_subscription + if topic in self._subscriptions: return @@ -253,7 +225,7 @@ def subscribe(self, topic: str) -> None: # Notify all connected peers if self._running: - asyncio.create_task(self._broadcast_subscription(topic, subscribe=True)) + asyncio.create_task(broadcast_subscription(self, topic, subscribe=True)) def unsubscribe(self, topic: str) -> None: """ @@ -268,6 +240,8 @@ def unsubscribe(self, topic: str) -> None: Args: topic: Topic string to unsubscribe from. """ + from lean_spec.subspecs.networking.gossipsub.stream import broadcast_subscription + if topic not in self._subscriptions: return @@ -278,15 +252,17 @@ def unsubscribe(self, topic: str) -> None: # Notify all connected peers if self._running: - asyncio.create_task(self._broadcast_subscription(topic, subscribe=False)) + asyncio.create_task(broadcast_subscription(self, topic, subscribe=False)) async def start(self) -> None: """Start the gossipsub behavior (heartbeat loop).""" + from lean_spec.subspecs.networking.gossipsub.heartbeat import heartbeat_loop + if self._running: return self._running = True - self._heartbeat_task = asyncio.create_task(self._heartbeat_loop()) + self._heartbeat_task = asyncio.create_task(heartbeat_loop(self)) logger.info("[GS %x] GossipsubBehavior started", self._instance_id % 0xFFFF) async def stop(self) -> None: @@ -334,6 +310,8 @@ async def add_peer(self, peer_id: PeerId, stream: Any, *, inbound: bool = False) stream: Stream for RPC exchange. inbound: True if this is an inbound stream (peer opened to us). """ + from lean_spec.subspecs.networking.gossipsub.stream import receive_loop + existing = self._peers.get(peer_id) if inbound: @@ -355,7 +333,7 @@ async def add_peer(self, peer_id: PeerId, stream: Any, *, inbound: bool = False) # Start receiving RPCs on the inbound stream. # Track the task so we can cancel it on stop(). - receive_task = asyncio.create_task(self._receive_loop(peer_id, stream)) + receive_task = asyncio.create_task(receive_loop(self, peer_id, stream)) state.receive_task = receive_task # Yield to allow the receive loop task to start before we return. @@ -427,6 +405,8 @@ async def publish(self, topic: str, data: bytes) -> None: topic: Topic to publish to. data: Message payload. """ + from lean_spec.subspecs.networking.gossipsub.message import GossipsubMessage + # Create message msg = Message(topic=topic, data=data) @@ -442,8 +422,6 @@ async def publish(self, topic: str, data: bytes) -> None: self.seen_cache.add(msg_id, time.time()) # Add to message cache - from lean_spec.subspecs.networking.gossipsub.message import GossipsubMessage - cache_msg = GossipsubMessage(topic=topic.encode("utf-8"), raw_data=data) self.message_cache.put(topic, cache_msg) @@ -535,417 +513,11 @@ async def get_next_event( # Internal Methods # ========================================================================= - async def _heartbeat_loop(self) -> None: - """Background heartbeat for mesh maintenance.""" - interval = self.params.heartbeat_interval_secs - - while self._running: - try: - await asyncio.sleep(interval) - await self._heartbeat() - except asyncio.CancelledError: - break - except Exception as e: - logger.warning("Heartbeat error: %s", e) - - async def _heartbeat(self) -> None: - """ - Perform heartbeat maintenance. - - The heartbeat: - - 1. Maintains mesh sizes (GRAFT if < D_low, PRUNE if > D_high) - 2. Sends IHAVE gossip to non-mesh peers - 3. Ages the message cache - """ - now = time.time() - - for topic in self._subscriptions: - await self._maintain_mesh(topic, now) - await self._emit_gossip(topic) - - # Age message cache - self.message_cache.shift() - - # Clean up seen cache - self.seen_cache.cleanup(now) - - async def _maintain_mesh(self, topic: str, now: float) -> None: - """Maintain mesh size for a topic.""" - mesh_peers = self.mesh.get_mesh_peers(topic) - mesh_size = len(mesh_peers) - - # Find eligible peers (subscribed to topic, not in mesh, and can send to). - # - # IMPORTANT: Only consider peers we can actually send to. - # If we don't have an outbound stream yet (peer just connected, stream - # setup still in progress), skip them. They'll become eligible once - # their outbound stream is established. - eligible = [] - for peer_id, state in self._peers.items(): - # Must have outbound stream to send GRAFT - if state.outbound_stream is None: - continue - if topic in state.subscriptions and peer_id not in mesh_peers: - # Check backoff - backoff_until = state.backoff.get(topic, 0) - if now >= backoff_until: - eligible.append(peer_id) - - # GRAFT if too few peers - if mesh_size < self.params.d_low and eligible: - needed = self.params.d - mesh_size - to_graft = eligible[: min(needed, len(eligible))] - - for peer_id in to_graft: - self.mesh.add_to_mesh(topic, peer_id) - - # Send GRAFT - rpc = create_graft_rpc([topic]) - for peer_id in to_graft: - await self._send_rpc(peer_id, rpc) - - logger.debug("GRAFT %d peers for topic %s", len(to_graft), topic) - - # PRUNE if too many peers - elif mesh_size > self.params.d_high: - # Keep peers with best scores (for now, just take first D) - to_prune = list(mesh_peers)[self.params.d :] - - for peer_id in to_prune: - self.mesh.remove_from_mesh(topic, peer_id) - - # Send PRUNE - prune_rpc = RPC( - control=ControlMessage(prune=[ControlPrune(topic_id=topic, backoff=PRUNE_BACKOFF)]) - ) - for peer_id in to_prune: - await self._send_rpc(peer_id, prune_rpc) - - logger.debug("PRUNE %d peers for topic %s", len(to_prune), topic) - - async def _emit_gossip(self, topic: str) -> None: - """Send IHAVE gossip to non-mesh peers.""" - # Get message IDs from cache - msg_ids = self.message_cache.get_gossip_ids(topic) - if not msg_ids: - return - - # Get all connected peers subscribed to this topic (with outbound streams). - # - # Only include peers we can actually send to. Peers without outbound - # streams yet (still setting up) are skipped. - all_topic_peers = { - p - for p, state in self._peers.items() - if topic in state.subscriptions and state.outbound_stream is not None - } - - # Select D_lazy non-mesh peers - gossip_peers = self.mesh.select_peers_for_gossip(topic, all_topic_peers) - if not gossip_peers: - return - - # Send IHAVE - msg_id_bytes = [ - msg_id if isinstance(msg_id, bytes) else bytes(msg_id) for msg_id in msg_ids - ] - ihave = ControlIHave(topic_id=topic, message_ids=msg_id_bytes) - rpc = RPC(control=ControlMessage(ihave=[ihave])) - - for peer_id in gossip_peers: - await self._send_rpc(peer_id, rpc) - - logger.debug( - "IHAVE %d messages to %d peers for topic %s", len(msg_ids), len(gossip_peers), topic - ) - - async def _broadcast_subscription(self, topic: str, subscribe: bool) -> None: - """Broadcast subscription change to all peers.""" - rpc = create_subscription_rpc([topic], subscribe) - - # Only send to peers we have outbound streams for - for peer_id, state in self._peers.items(): - if state.outbound_stream is not None: - await self._send_rpc(peer_id, rpc) - - # If subscribing, send GRAFT to eligible peers (must have outbound stream) - if subscribe: - eligible = [ - p - for p, s in self._peers.items() - if topic in s.subscriptions and s.outbound_stream is not None - ][: self.params.d] - - if eligible: - graft_rpc = create_graft_rpc([topic]) - for peer_id in eligible: - self.mesh.add_to_mesh(topic, peer_id) - await self._send_rpc(peer_id, graft_rpc) - async def _send_rpc(self, peer_id: PeerId, rpc: RPC) -> None: """Send an RPC to a peer on the outbound stream.""" - state = self._peers.get(peer_id) - if state is None or state.outbound_stream is None: - # Expected during stream setup - peer might only have inbound stream yet. - # The outbound stream will be established shortly. - logger.debug("Cannot send RPC to %s: no outbound stream yet", peer_id) - return - - try: - data = rpc.encode() - # Length-prefix the RPC (varint + data) - frame = encode_varint(len(data)) + data - logger.debug( - "Sending RPC to %s: %d bytes (subs=%d, msgs=%d)", - peer_id, - len(frame), - len(rpc.subscriptions), - len(rpc.publish), - ) - state.outbound_stream.write(frame) - await state.outbound_stream.drain() - logger.debug("RPC sent and drained to %s", peer_id) - state.last_rpc_time = time.time() - except Exception as e: - logger.warning("Failed to send RPC to %s: %s", peer_id, e) - - async def _receive_loop(self, peer_id: PeerId, stream: Any) -> None: - """Receive and process RPCs from a peer.""" - buffer = bytearray() - logger.debug("Starting receive loop for peer %s", peer_id) - - try: - while self._running and peer_id in self._peers: - try: - chunk = await stream.read() - if not chunk: - logger.debug("Receive loop got empty chunk from %s, exiting", peer_id) - break - logger.debug("Received %d bytes from %s", len(chunk), peer_id) - buffer.extend(chunk) - - # Try to parse complete RPCs - while buffer: - try: - # Read length prefix - if len(buffer) < 1: - break - length, varint_size = decode_varint(bytes(buffer), 0) - if len(buffer) < varint_size + length: - break - - # Extract and parse RPC - rpc_data = bytes(buffer[varint_size : varint_size + length]) - buffer = buffer[varint_size + length :] - - rpc = RPC.decode(rpc_data) - logger.debug( - "Received RPC from %s: subs=%d, msgs=%d, ctrl=%s", - peer_id, - len(rpc.subscriptions), - len(rpc.publish), - bool(rpc.control), - ) - await self._handle_rpc(peer_id, rpc) - except Exception as e: - logger.warning("Error parsing RPC from %s: %s", peer_id, e) - break - - except asyncio.CancelledError: - break - except Exception as e: - logger.warning("Error receiving from %s: %s", peer_id, e) - break - - finally: - # Clean up peer on disconnect - await self.remove_peer(peer_id) + from lean_spec.subspecs.networking.gossipsub.stream import send_rpc - async def _handle_rpc(self, peer_id: PeerId, rpc: RPC) -> None: - """Handle an incoming RPC.""" - state = self._peers.get(peer_id) - if state is None: - return - - # Process subscriptions - for sub in rpc.subscriptions: - await self._handle_subscription(peer_id, sub) - - # Process published messages - for msg in rpc.publish: - await self._handle_message(peer_id, msg) - - # Process control messages - if rpc.control: - await self._handle_control(peer_id, rpc.control) - - async def _handle_subscription(self, peer_id: PeerId, sub: SubOpts) -> None: - """Handle a subscription change from a peer.""" - state = self._peers.get(peer_id) - if state is None: - return - - if sub.subscribe: - state.subscriptions.add(sub.topic_id) - logger.debug("Peer %s subscribed to %s", peer_id, sub.topic_id) - else: - state.subscriptions.discard(sub.topic_id) - # Remove from mesh if they unsubscribed - self.mesh.remove_from_mesh(sub.topic_id, peer_id) - logger.debug("Peer %s unsubscribed from %s", peer_id, sub.topic_id) - - # Emit event - await self._event_queue.put( - GossipsubPeerEvent(peer_id=peer_id, topic=sub.topic_id, subscribed=sub.subscribe) - ) - - async def _handle_message(self, peer_id: PeerId, msg: Message) -> None: - """Handle a published message from a peer.""" - if not msg.topic: - return - - # Compute message ID - msg_id = self._compute_message_id(msg.topic.encode("utf-8"), msg.data) - - # Check if already seen - if self.seen_cache.has(msg_id): - return - - # Mark as seen - self.seen_cache.add(msg_id, time.time()) - - # Add to cache - from lean_spec.subspecs.networking.gossipsub.message import GossipsubMessage - - cache_msg = GossipsubMessage(topic=msg.topic.encode("utf-8"), raw_data=msg.data) - self.message_cache.put(msg.topic, cache_msg) - - # Forward to mesh peers (excluding sender) - if msg.topic in self._subscriptions: - mesh_peers = self.mesh.get_mesh_peers(msg.topic) - forward_rpc = RPC(publish=[msg]) - - for mesh_peer in mesh_peers: - if mesh_peer != peer_id: - await self._send_rpc(mesh_peer, forward_rpc) - - # Emit event to application - event = GossipsubMessageEvent( - peer_id=peer_id, topic=msg.topic, data=msg.data, message_id=msg_id - ) - await self._event_queue.put(event) - - # Call handler if set - if self._message_handler: - self._message_handler(event) - - logger.debug( - "Received message %s from %s on topic %s", msg_id.hex()[:8], peer_id, msg.topic - ) - - async def _handle_control(self, peer_id: PeerId, control: ControlMessage) -> None: - """Handle control messages from a peer.""" - state = self._peers.get(peer_id) - if state is None: - return - - # Handle GRAFT - for graft in control.graft: - await self._handle_graft(peer_id, graft) - - # Handle PRUNE - for prune in control.prune: - await self._handle_prune(peer_id, prune) - - # Handle IHAVE - for ihave in control.ihave: - await self._handle_ihave(peer_id, ihave) - - # Handle IWANT - for iwant in control.iwant: - await self._handle_iwant(peer_id, iwant) - - async def _handle_graft(self, peer_id: PeerId, graft: ControlGraft) -> None: - """Handle a GRAFT request from a peer.""" - topic = graft.topic_id - - # Check if we're subscribed to the topic - if topic not in self._subscriptions: - # Send PRUNE - we're not subscribed - prune = ControlPrune(topic_id=topic, backoff=PRUNE_BACKOFF) - prune_rpc = RPC(control=ControlMessage(prune=[prune])) - await self._send_rpc(peer_id, prune_rpc) - return - - # Check mesh size - mesh_peers = self.mesh.get_mesh_peers(topic) - if len(mesh_peers) >= self.params.d_high: - # Mesh is full, send PRUNE - prune = ControlPrune(topic_id=topic, backoff=PRUNE_BACKOFF) - prune_rpc = RPC(control=ControlMessage(prune=[prune])) - await self._send_rpc(peer_id, prune_rpc) - return - - # Accept GRAFT - self.mesh.add_to_mesh(topic, peer_id) - logger.debug("Accepted GRAFT from %s for topic %s", peer_id, topic) - - async def _handle_prune(self, peer_id: PeerId, prune: ControlPrune) -> None: - """Handle a PRUNE notification from a peer.""" - topic = prune.topic_id - state = self._peers.get(peer_id) - - # Remove from mesh - self.mesh.remove_from_mesh(topic, peer_id) - - # Set backoff - if state and prune.backoff > 0: - state.backoff[topic] = time.time() + prune.backoff - - logger.debug( - "Received PRUNE from %s for topic %s (backoff=%ds)", peer_id, topic, prune.backoff - ) - - async def _handle_ihave(self, peer_id: PeerId, ihave: ControlIHave) -> None: - """Handle an IHAVE advertisement from a peer.""" - # Find messages we don't have - wanted = [] - for msg_id in ihave.message_ids: - # Convert bytes to Bytes20 for cache lookup - if len(msg_id) != 20: - continue - msg_id_typed = Bytes20(msg_id) - if not self.seen_cache.has(msg_id_typed) and not self.message_cache.has(msg_id_typed): - wanted.append(msg_id) - - if not wanted: - return - - # Send IWANT - iwant_rpc = RPC(control=ControlMessage(iwant=[ControlIWant(message_ids=wanted)])) - await self._send_rpc(peer_id, iwant_rpc) - - logger.debug("Sent IWANT for %d messages from %s", len(wanted), peer_id) - - async def _handle_iwant(self, peer_id: PeerId, iwant: ControlIWant) -> None: - """Handle an IWANT request from a peer.""" - messages = [] - - for msg_id in iwant.message_ids: - # Convert bytes to Bytes20 for cache lookup - if len(msg_id) != 20: - continue - msg_id_typed = Bytes20(msg_id) - cached = self.message_cache.get(msg_id_typed) - if cached: - messages.append(Message(topic=cached.topic.decode("utf-8"), data=cached.raw_data)) - - if messages: - rpc = RPC(publish=messages) - await self._send_rpc(peer_id, rpc) - - logger.debug("Sent %d messages in response to IWANT from %s", len(messages), peer_id) + await send_rpc(self, peer_id, rpc) def _compute_message_id(self, topic: bytes, data: bytes) -> Bytes20: """ @@ -953,9 +525,6 @@ def _compute_message_id(self, topic: bytes, data: bytes) -> Bytes20: message_id = SHA256(domain + uint64_le(len(topic)) + topic + data)[:20] """ - import hashlib - import struct - # Domain byte for message-id isolation (0x01 for valid snappy) domain = bytes(MESSAGE_DOMAIN_VALID_SNAPPY) diff --git a/src/lean_spec/subspecs/networking/gossipsub/control.py b/src/lean_spec/subspecs/networking/gossipsub/control.py index 815a0167..9841d946 100644 --- a/src/lean_spec/subspecs/networking/gossipsub/control.py +++ b/src/lean_spec/subspecs/networking/gossipsub/control.py @@ -138,13 +138,6 @@ class ControlMessage(StrictBaseModel): Multiple control messages are batched into a single RPC for efficiency. An RPC can contain any combination of control message types. - - Example:: - - control = ControlMessage( - grafts=[Graft(topic_id="blocks")], - ihaves=[IHave(topic_id="blocks", message_ids=[msg_id])], - ) """ grafts: list[Graft] = [] diff --git a/src/lean_spec/subspecs/networking/gossipsub/handlers.py b/src/lean_spec/subspecs/networking/gossipsub/handlers.py new file mode 100644 index 00000000..22129157 --- /dev/null +++ b/src/lean_spec/subspecs/networking/gossipsub/handlers.py @@ -0,0 +1,239 @@ +""" +Gossipsub Message Handlers +========================= + +Handlers for incoming gossipsub RPCs and control messages. + +This module processes: + +- Subscription changes (peer joined/left topic) +- Published messages (validate, dedupe, forward) +- Control messages (GRAFT, PRUNE, IHAVE, IWANT) + +References: +----------- +- Gossipsub v1.1: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md +""" + +from __future__ import annotations + +import logging +import time +from typing import TYPE_CHECKING + +from lean_spec.subspecs.networking.config import PRUNE_BACKOFF +from lean_spec.subspecs.networking.gossipsub.rpc import ( + RPC, + ControlGraft, + ControlIHave, + ControlIWant, + ControlMessage, + ControlPrune, + Message, + SubOpts, +) +from lean_spec.subspecs.networking.transport import PeerId +from lean_spec.types import Bytes20 + +if TYPE_CHECKING: + from lean_spec.subspecs.networking.gossipsub.behavior import ( + GossipsubBehavior, + ) + +logger = logging.getLogger(__name__) + + +async def handle_rpc(behavior: GossipsubBehavior, peer_id: PeerId, rpc: RPC) -> None: + """Handle an incoming RPC.""" + state = behavior._peers.get(peer_id) + if state is None: + return + + # Process subscriptions + for sub in rpc.subscriptions: + await handle_subscription(behavior, peer_id, sub) + + # Process published messages + for msg in rpc.publish: + await handle_message(behavior, peer_id, msg) + + # Process control messages + if rpc.control: + await handle_control(behavior, peer_id, rpc.control) + + +async def handle_subscription(behavior: GossipsubBehavior, peer_id: PeerId, sub: SubOpts) -> None: + """Handle a subscription change from a peer.""" + from lean_spec.subspecs.networking.gossipsub.behavior import GossipsubPeerEvent + + state = behavior._peers.get(peer_id) + if state is None: + return + + if sub.subscribe: + state.subscriptions.add(sub.topic_id) + logger.debug("Peer %s subscribed to %s", peer_id, sub.topic_id) + else: + state.subscriptions.discard(sub.topic_id) + # Remove from mesh if they unsubscribed + behavior.mesh.remove_from_mesh(sub.topic_id, peer_id) + logger.debug("Peer %s unsubscribed from %s", peer_id, sub.topic_id) + + # Emit event + await behavior._event_queue.put( + GossipsubPeerEvent(peer_id=peer_id, topic=sub.topic_id, subscribed=sub.subscribe) + ) + + +async def handle_message(behavior: GossipsubBehavior, peer_id: PeerId, msg: Message) -> None: + """Handle a published message from a peer.""" + from lean_spec.subspecs.networking.gossipsub.behavior import GossipsubMessageEvent + from lean_spec.subspecs.networking.gossipsub.message import GossipsubMessage + + if not msg.topic: + return + + # Compute message ID + msg_id = behavior._compute_message_id(msg.topic.encode("utf-8"), msg.data) + + # Check if already seen + if behavior.seen_cache.has(msg_id): + return + + # Mark as seen + behavior.seen_cache.add(msg_id, time.time()) + + # Add to cache + cache_msg = GossipsubMessage(topic=msg.topic.encode("utf-8"), raw_data=msg.data) + behavior.message_cache.put(msg.topic, cache_msg) + + # Forward to mesh peers (excluding sender) + if msg.topic in behavior._subscriptions: + mesh_peers = behavior.mesh.get_mesh_peers(msg.topic) + forward_rpc = RPC(publish=[msg]) + + for mesh_peer in mesh_peers: + if mesh_peer != peer_id: + await behavior._send_rpc(mesh_peer, forward_rpc) + + # Emit event to application + event = GossipsubMessageEvent( + peer_id=peer_id, topic=msg.topic, data=msg.data, message_id=msg_id + ) + await behavior._event_queue.put(event) + + # Call handler if set + if behavior._message_handler: + behavior._message_handler(event) + + logger.debug("Received message %s from %s on topic %s", msg_id.hex()[:8], peer_id, msg.topic) + + +async def handle_control( + behavior: GossipsubBehavior, peer_id: PeerId, control: ControlMessage +) -> None: + """Handle control messages from a peer.""" + state = behavior._peers.get(peer_id) + if state is None: + return + + # Handle GRAFT + for graft in control.graft: + await handle_graft(behavior, peer_id, graft) + + # Handle PRUNE + for prune in control.prune: + await handle_prune(behavior, peer_id, prune) + + # Handle IHAVE + for ihave in control.ihave: + await handle_ihave(behavior, peer_id, ihave) + + # Handle IWANT + for iwant in control.iwant: + await handle_iwant(behavior, peer_id, iwant) + + +async def handle_graft(behavior: GossipsubBehavior, peer_id: PeerId, graft: ControlGraft) -> None: + """Handle a GRAFT request from a peer.""" + topic = graft.topic_id + + # Check if we're subscribed to the topic + if topic not in behavior._subscriptions: + # Send PRUNE - we're not subscribed + prune = ControlPrune(topic_id=topic, backoff=PRUNE_BACKOFF) + prune_rpc = RPC(control=ControlMessage(prune=[prune])) + await behavior._send_rpc(peer_id, prune_rpc) + return + + # Check mesh size + mesh_peers = behavior.mesh.get_mesh_peers(topic) + if len(mesh_peers) >= behavior.params.d_high: + # Mesh is full, send PRUNE + prune = ControlPrune(topic_id=topic, backoff=PRUNE_BACKOFF) + prune_rpc = RPC(control=ControlMessage(prune=[prune])) + await behavior._send_rpc(peer_id, prune_rpc) + return + + # Accept GRAFT + behavior.mesh.add_to_mesh(topic, peer_id) + logger.debug("Accepted GRAFT from %s for topic %s", peer_id, topic) + + +async def handle_prune(behavior: GossipsubBehavior, peer_id: PeerId, prune: ControlPrune) -> None: + """Handle a PRUNE notification from a peer.""" + topic = prune.topic_id + state = behavior._peers.get(peer_id) + + # Remove from mesh + behavior.mesh.remove_from_mesh(topic, peer_id) + + # Set backoff + if state and prune.backoff > 0: + state.backoff[topic] = time.time() + prune.backoff + + logger.debug("Received PRUNE from %s for topic %s (backoff=%ds)", peer_id, topic, prune.backoff) + + +async def handle_ihave(behavior: GossipsubBehavior, peer_id: PeerId, ihave: ControlIHave) -> None: + """Handle an IHAVE advertisement from a peer.""" + # Find messages we don't have + wanted = [] + for msg_id in ihave.message_ids: + # Convert bytes to Bytes20 for cache lookup + if len(msg_id) != 20: + continue + msg_id_typed = Bytes20(msg_id) + if not behavior.seen_cache.has(msg_id_typed) and not behavior.message_cache.has( + msg_id_typed + ): + wanted.append(msg_id) + + if not wanted: + return + + # Send IWANT + iwant_rpc = RPC(control=ControlMessage(iwant=[ControlIWant(message_ids=wanted)])) + await behavior._send_rpc(peer_id, iwant_rpc) + + logger.debug("Sent IWANT for %d messages from %s", len(wanted), peer_id) + + +async def handle_iwant(behavior: GossipsubBehavior, peer_id: PeerId, iwant: ControlIWant) -> None: + """Handle an IWANT request from a peer.""" + messages = [] + + for msg_id in iwant.message_ids: + # Convert bytes to Bytes20 for cache lookup + if len(msg_id) != 20: + continue + msg_id_typed = Bytes20(msg_id) + cached = behavior.message_cache.get(msg_id_typed) + if cached: + messages.append(Message(topic=cached.topic.decode("utf-8"), data=cached.raw_data)) + + if messages: + rpc = RPC(publish=messages) + await behavior._send_rpc(peer_id, rpc) + + logger.debug("Sent %d messages in response to IWANT from %s", len(messages), peer_id) diff --git a/src/lean_spec/subspecs/networking/gossipsub/heartbeat.py b/src/lean_spec/subspecs/networking/gossipsub/heartbeat.py new file mode 100644 index 00000000..a2158081 --- /dev/null +++ b/src/lean_spec/subspecs/networking/gossipsub/heartbeat.py @@ -0,0 +1,165 @@ +""" +Gossipsub Heartbeat and Mesh Maintenance +======================================== + +Periodic maintenance tasks for the gossipsub mesh. + +The heartbeat runs at regular intervals (default 700ms) and: + +1. Maintains mesh sizes (GRAFT if < D_low, PRUNE if > D_high) +2. Sends IHAVE gossip to non-mesh peers +3. Ages the message cache +4. Cleans up seen cache + +References: +----------- +- Gossipsub v1.1: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md +""" + +from __future__ import annotations + +import asyncio +import logging +import time +from typing import TYPE_CHECKING + +from lean_spec.subspecs.networking.config import PRUNE_BACKOFF +from lean_spec.subspecs.networking.gossipsub.rpc import ( + RPC, + ControlIHave, + ControlMessage, + ControlPrune, + create_graft_rpc, +) + +if TYPE_CHECKING: + from lean_spec.subspecs.networking.gossipsub.behavior import GossipsubBehavior + +logger = logging.getLogger(__name__) + + +async def heartbeat_loop(behavior: GossipsubBehavior) -> None: + """Background heartbeat for mesh maintenance.""" + interval = behavior.params.heartbeat_interval_secs + + while behavior._running: + try: + await asyncio.sleep(interval) + await heartbeat(behavior) + except asyncio.CancelledError: + break + except Exception as e: + logger.warning("Heartbeat error: %s", e) + + +async def heartbeat(behavior: GossipsubBehavior) -> None: + """ + Perform heartbeat maintenance. + + The heartbeat: + + 1. Maintains mesh sizes (GRAFT if < D_low, PRUNE if > D_high) + 2. Sends IHAVE gossip to non-mesh peers + 3. Ages the message cache + """ + now = time.time() + + for topic in behavior._subscriptions: + await maintain_mesh(behavior, topic, now) + await emit_gossip(behavior, topic) + + # Age message cache + behavior.message_cache.shift() + + # Clean up seen cache + behavior.seen_cache.cleanup(now) + + +async def maintain_mesh(behavior: GossipsubBehavior, topic: str, now: float) -> None: + """Maintain mesh size for a topic.""" + mesh_peers = behavior.mesh.get_mesh_peers(topic) + mesh_size = len(mesh_peers) + + # Find eligible peers (subscribed to topic, not in mesh, and can send to). + # + # IMPORTANT: Only consider peers we can actually send to. + # If we don't have an outbound stream yet (peer just connected, stream + # setup still in progress), skip them. They'll become eligible once + # their outbound stream is established. + eligible = [] + for peer_id, state in behavior._peers.items(): + # Must have outbound stream to send GRAFT + if state.outbound_stream is None: + continue + if topic in state.subscriptions and peer_id not in mesh_peers: + # Check backoff + backoff_until = state.backoff.get(topic, 0) + if now >= backoff_until: + eligible.append(peer_id) + + # GRAFT if too few peers + if mesh_size < behavior.params.d_low and eligible: + needed = behavior.params.d - mesh_size + to_graft = eligible[: min(needed, len(eligible))] + + for peer_id in to_graft: + behavior.mesh.add_to_mesh(topic, peer_id) + + # Send GRAFT + rpc = create_graft_rpc([topic]) + for peer_id in to_graft: + await behavior._send_rpc(peer_id, rpc) + + logger.debug("GRAFT %d peers for topic %s", len(to_graft), topic) + + # PRUNE if too many peers + elif mesh_size > behavior.params.d_high: + # Keep peers with best scores (for now, just take first D) + to_prune = list(mesh_peers)[behavior.params.d :] + + for peer_id in to_prune: + behavior.mesh.remove_from_mesh(topic, peer_id) + + # Send PRUNE + prune_rpc = RPC( + control=ControlMessage(prune=[ControlPrune(topic_id=topic, backoff=PRUNE_BACKOFF)]) + ) + for peer_id in to_prune: + await behavior._send_rpc(peer_id, prune_rpc) + + logger.debug("PRUNE %d peers for topic %s", len(to_prune), topic) + + +async def emit_gossip(behavior: GossipsubBehavior, topic: str) -> None: + """Send IHAVE gossip to non-mesh peers.""" + # Get message IDs from cache + msg_ids = behavior.message_cache.get_gossip_ids(topic) + if not msg_ids: + return + + # Get all connected peers subscribed to this topic (with outbound streams). + # + # Only include peers we can actually send to. Peers without outbound + # streams yet (still setting up) are skipped. + all_topic_peers = { + p + for p, state in behavior._peers.items() + if topic in state.subscriptions and state.outbound_stream is not None + } + + # Select D_lazy non-mesh peers + gossip_peers = behavior.mesh.select_peers_for_gossip(topic, all_topic_peers) + if not gossip_peers: + return + + # Send IHAVE + msg_id_bytes = [msg_id if isinstance(msg_id, bytes) else bytes(msg_id) for msg_id in msg_ids] + ihave = ControlIHave(topic_id=topic, message_ids=msg_id_bytes) + rpc = RPC(control=ControlMessage(ihave=[ihave])) + + for peer_id in gossip_peers: + await behavior._send_rpc(peer_id, rpc) + + logger.debug( + "IHAVE %d messages to %d peers for topic %s", len(msg_ids), len(gossip_peers), topic + ) diff --git a/src/lean_spec/subspecs/networking/gossipsub/mcache.py b/src/lean_spec/subspecs/networking/gossipsub/mcache.py index 1e14fe17..86604f01 100644 --- a/src/lean_spec/subspecs/networking/gossipsub/mcache.py +++ b/src/lean_spec/subspecs/networking/gossipsub/mcache.py @@ -95,23 +95,6 @@ class MessageCache: - **IWANT responses**: Retrieve full messages by ID - **IHAVE gossip**: Get message IDs for advertisement - - Example:: - - cache = MessageCache(mcache_len=6, mcache_gossip=3) - - # Add messages - cache.put("blocks", msg1) - cache.put("blocks", msg2) - - # Get message IDs for IHAVE - ids = cache.get_gossip_ids("blocks") - - # Respond to IWANT - msg = cache.get(requested_id) - - # Shift window (called each heartbeat) - evicted = cache.shift() """ mcache_len: int = 6 diff --git a/src/lean_spec/subspecs/networking/gossipsub/mesh.py b/src/lean_spec/subspecs/networking/gossipsub/mesh.py index 6cfa22d3..173aa822 100644 --- a/src/lean_spec/subspecs/networking/gossipsub/mesh.py +++ b/src/lean_spec/subspecs/networking/gossipsub/mesh.py @@ -139,22 +139,6 @@ class MeshState: Central data structure managing mesh topology across all topics. Provides operations for subscription management, peer tracking, and gossip peer selection. - - Example:: - - state = MeshState(params=GossipsubParameters()) - - # Subscribe and build mesh - state.subscribe("blocks") - state.add_to_mesh("blocks", "peer1") - state.add_to_mesh("blocks", "peer2") - - # Get mesh peers for message forwarding - peers = state.get_mesh_peers("blocks") # {"peer1", "peer2"} - - # Select peers for IHAVE gossip - all_peers = {"peer1", "peer2", "peer3", "peer4"} - gossip_peers = state.select_peers_for_gossip("blocks", all_peers) """ params: GossipsubParameters diff --git a/src/lean_spec/subspecs/networking/gossipsub/message.py b/src/lean_spec/subspecs/networking/gossipsub/message.py index ec804afc..22ac0a0c 100644 --- a/src/lean_spec/subspecs/networking/gossipsub/message.py +++ b/src/lean_spec/subspecs/networking/gossipsub/message.py @@ -73,17 +73,6 @@ class SnappyDecompressor(Protocol): Any callable matching this signature can be used for decompression. The function should raise an exception if decompression fails. - - Example:: - - import snappy - - # Using python-snappy library - msg = GossipsubMessage( - topic=b"/leanconsensus/...", - raw_data=compressed_data, - snappy_decompress=snappy.decompress, - ) """ def __call__(self, data: bytes) -> bytes: @@ -194,14 +183,6 @@ def compute_id( Returns: 20-byte message ID. - - Example:: - - msg_id = GossipsubMessage.compute_id( - topic=b"/leanconsensus/0x12345678/block/ssz_snappy", - data=block_bytes, - snappy_decompress=snappy.decompress, - ) """ if snappy_decompress is not None: try: diff --git a/src/lean_spec/subspecs/networking/gossipsub/parameters.py b/src/lean_spec/subspecs/networking/gossipsub/parameters.py index db96c5e2..f72e6e18 100644 --- a/src/lean_spec/subspecs/networking/gossipsub/parameters.py +++ b/src/lean_spec/subspecs/networking/gossipsub/parameters.py @@ -159,17 +159,3 @@ class GossipsubParameters(StrictBaseModel): be long enough to cover network propagation delays but short enough to bound memory usage. """ - - # ------------------------------------------------------------------------- - # IDONTWANT Optimization (v1.2) - # ------------------------------------------------------------------------- - - idontwant_message_size_threshold: int = 1000 - """Minimum message size in bytes to trigger IDONTWANT. - - When receiving a message larger than this threshold, - immediately send IDONTWANT to mesh peers to prevent - redundant transmissions. - - Set to 1KB by default. - """ diff --git a/src/lean_spec/subspecs/networking/gossipsub/stream.py b/src/lean_spec/subspecs/networking/gossipsub/stream.py new file mode 100644 index 00000000..2488a024 --- /dev/null +++ b/src/lean_spec/subspecs/networking/gossipsub/stream.py @@ -0,0 +1,143 @@ +""" +Gossipsub Stream Management +========================== + +Stream handling for gossipsub RPC exchange. + +This module manages: + +- Sending RPCs to peers (length-prefixed framing) +- Receiving RPCs from peers (buffered parsing) +- Broadcasting subscriptions to all peers + +References: +----------- +- Gossipsub v1.1: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md +""" + +from __future__ import annotations + +import asyncio +import logging +import time +from typing import TYPE_CHECKING, Any + +from lean_spec.subspecs.networking.gossipsub.rpc import ( + RPC, + create_graft_rpc, + create_subscription_rpc, +) +from lean_spec.subspecs.networking.transport import PeerId +from lean_spec.subspecs.networking.varint import decode_varint, encode_varint + +if TYPE_CHECKING: + from lean_spec.subspecs.networking.gossipsub.behavior import GossipsubBehavior + +logger = logging.getLogger(__name__) + + +async def send_rpc(behavior: GossipsubBehavior, peer_id: PeerId, rpc: RPC) -> None: + """Send an RPC to a peer on the outbound stream.""" + state = behavior._peers.get(peer_id) + if state is None or state.outbound_stream is None: + # Expected during stream setup - peer might only have inbound stream yet. + # The outbound stream will be established shortly. + logger.debug("Cannot send RPC to %s: no outbound stream yet", peer_id) + return + + try: + data = rpc.encode() + # Length-prefix the RPC (varint + data) + frame = encode_varint(len(data)) + data + logger.debug( + "Sending RPC to %s: %d bytes (subs=%d, msgs=%d)", + peer_id, + len(frame), + len(rpc.subscriptions), + len(rpc.publish), + ) + state.outbound_stream.write(frame) + await state.outbound_stream.drain() + logger.debug("RPC sent and drained to %s", peer_id) + state.last_rpc_time = time.time() + except Exception as e: + logger.warning("Failed to send RPC to %s: %s", peer_id, e) + + +async def receive_loop(behavior: GossipsubBehavior, peer_id: PeerId, stream: Any) -> None: + """Receive and process RPCs from a peer.""" + from lean_spec.subspecs.networking.gossipsub.handlers import handle_rpc + + buffer = bytearray() + logger.debug("Starting receive loop for peer %s", peer_id) + + try: + while behavior._running and peer_id in behavior._peers: + try: + chunk = await stream.read() + if not chunk: + logger.debug("Receive loop got empty chunk from %s, exiting", peer_id) + break + logger.debug("Received %d bytes from %s", len(chunk), peer_id) + buffer.extend(chunk) + + # Try to parse complete RPCs + while buffer: + try: + # Read length prefix + if len(buffer) < 1: + break + length, varint_size = decode_varint(bytes(buffer), 0) + if len(buffer) < varint_size + length: + break + + # Extract and parse RPC + rpc_data = bytes(buffer[varint_size : varint_size + length]) + buffer = buffer[varint_size + length :] + + rpc = RPC.decode(rpc_data) + logger.debug( + "Received RPC from %s: subs=%d, msgs=%d, ctrl=%s", + peer_id, + len(rpc.subscriptions), + len(rpc.publish), + bool(rpc.control), + ) + await handle_rpc(behavior, peer_id, rpc) + except Exception as e: + logger.warning("Error parsing RPC from %s: %s", peer_id, e) + break + + except asyncio.CancelledError: + break + except Exception as e: + logger.warning("Error receiving from %s: %s", peer_id, e) + break + + finally: + # Clean up peer on disconnect + await behavior.remove_peer(peer_id) + + +async def broadcast_subscription(behavior: GossipsubBehavior, topic: str, subscribe: bool) -> None: + """Broadcast subscription change to all peers.""" + rpc = create_subscription_rpc([topic], subscribe) + + # Only send to peers we have outbound streams for + for peer_id, state in behavior._peers.items(): + if state.outbound_stream is not None: + await behavior._send_rpc(peer_id, rpc) + + # If subscribing, send GRAFT to eligible peers (must have outbound stream) + if subscribe: + eligible = [ + p + for p, s in behavior._peers.items() + if topic in s.subscriptions and s.outbound_stream is not None + ][: behavior.params.d] + + if eligible: + graft_rpc = create_graft_rpc([topic]) + for peer_id in eligible: + behavior.mesh.add_to_mesh(topic, peer_id) + await behavior._send_rpc(peer_id, graft_rpc) diff --git a/src/lean_spec/subspecs/networking/gossipsub/topic.py b/src/lean_spec/subspecs/networking/gossipsub/topic.py index dda9438d..6280a6b0 100644 --- a/src/lean_spec/subspecs/networking/gossipsub/topic.py +++ b/src/lean_spec/subspecs/networking/gossipsub/topic.py @@ -194,10 +194,6 @@ def from_string(cls, topic_str: str) -> GossipTopic: Raises: ValueError: If the topic string is malformed. - - Example:: - - topic = GossipTopic.from_string("/leanconsensus/0x12345678/block/ssz_snappy") """ parts = topic_str.lstrip("/").split("/") @@ -285,11 +281,6 @@ def format_topic_string( Returns: Formatted topic string. - - Example:: - - topic_str = format_topic_string("block", "0x12345678") - assert topic_str == "/leanconsensus/0x12345678/block/ssz_snappy" """ return f"/{prefix}/{fork_digest}/{topic_name}/{encoding}" @@ -308,14 +299,6 @@ def parse_topic_string(topic_str: str) -> tuple[str, str, str, str]: Raises: ValueError: If the topic string is malformed. - - Example:: - - prefix, fork, name, enc = parse_topic_string("/leanconsensus/0x12345678/block/ssz_snappy") - assert prefix == "leanconsensus" - assert fork == "0x12345678" - assert name == "block" - assert enc == "ssz_snappy" """ parts = topic_str.lstrip("/").split("/") diff --git a/src/lean_spec/subspecs/ssz/merkle_proof/__init__.py b/src/lean_spec/subspecs/ssz/merkle_proof/__init__.py deleted file mode 100644 index 95842bc8..00000000 --- a/src/lean_spec/subspecs/ssz/merkle_proof/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""SSZ Merkle related functionality.""" diff --git a/src/lean_spec/subspecs/ssz/merkle_proof/gindex.py b/src/lean_spec/subspecs/ssz/merkle_proof/gindex.py deleted file mode 100644 index 6fc6e05d..00000000 --- a/src/lean_spec/subspecs/ssz/merkle_proof/gindex.py +++ /dev/null @@ -1,68 +0,0 @@ -"""Generalized Index implementation.""" - -from pydantic import Field - -from lean_spec.types import StrictBaseModel - - -class GeneralizedIndex(StrictBaseModel): - """ - Represents a Generalized Merkle Tree Index. - - Helper methods are provided for tree navigation. - """ - - value: int = Field(..., gt=0, description="The index value, must be a positive integer.") - - def __hash__(self) -> int: - """Hashes the index value.""" - return hash(self.value) - - @property - def depth(self) -> int: - """The depth of the node in the tree.""" - return self.value.bit_length() - 1 - - def get_bit(self, position: int) -> bool: - """Returns the bit at a specific position (from the right).""" - return (self.value >> position) & 1 == 1 - - @property - def sibling(self) -> "GeneralizedIndex": - """Returns the index of the sibling node.""" - return type(self)(value=self.value ^ 1) - - @property - def parent(self) -> "GeneralizedIndex": - """Returns the index of the parent node.""" - if self.value <= 1: - raise ValueError("Root node has no parent.") - return type(self)(value=self.value // 2) - - def child(self, right_side: bool) -> "GeneralizedIndex": - """ - Returns the index of a child node. - - - `right_side=False` for the left child (2k), - - `right_side=True` for the right child (2k+1). - """ - return type(self)(value=self.value * 2 + int(right_side)) - - def get_branch_indices(self) -> list["GeneralizedIndex"]: - """Gets the indices of the sibling nodes along the path to the root.""" - indices: list["GeneralizedIndex"] = [] - current_index = self.value - while current_index > 1: - # Sibling is the current index XOR 1 - sibling_index = current_index ^ 1 - indices.append(type(self)(value=sibling_index)) - # Move up to the parent - current_index //= 2 - return indices - - def get_path_indices(self) -> list["GeneralizedIndex"]: - """Gets the indices of the nodes along the path to the root.""" - indices: list["GeneralizedIndex"] = [self] - while indices[-1].value > 1: - indices.append(indices[-1].parent) - return indices[:-1] diff --git a/src/lean_spec/subspecs/ssz/merkle_proof/proof.py b/src/lean_spec/subspecs/ssz/merkle_proof/proof.py deleted file mode 100644 index ef1334b0..00000000 --- a/src/lean_spec/subspecs/ssz/merkle_proof/proof.py +++ /dev/null @@ -1,136 +0,0 @@ -"""Merkle proofs for SSZ.""" - -from __future__ import annotations - -from typing import Sequence - -from pydantic import Field, model_validator - -from lean_spec.types import ZERO_HASH, StrictBaseModel -from lean_spec.types.byte_arrays import Bytes32 - -from ..utils import hash_nodes -from .gindex import GeneralizedIndex - -Root = Bytes32 -"""The type of a Merkle tree root.""" -Proof = Sequence[Bytes32] -"""The type of a Merkle proof.""" -ProofHashes = Sequence[Bytes32] -"""The type of a Merkle proof's helper nodes.""" - - -class MerkleProof(StrictBaseModel): - """ - Represents a Merkle multiproof, encapsulating its data and verification logic. - - This object is immutable; once created, its contents cannot be changed. - """ - - leaves: Sequence[Bytes32] = Field(..., description="The leaf data being proven.") - - indices: Sequence[GeneralizedIndex] = Field( - ..., description="The generalized indices of the leaves." - ) - - proof_hashes: ProofHashes = Field(..., description="The helper nodes required for the proof.") - - @model_validator(mode="after") - def check_leaves_and_indices_length(self) -> MerkleProof: - """Ensures the number of leaves matches the number of indices.""" - if len(self.leaves) != len(self.indices): - raise ValueError("The number of leaves must match the number of indices.") - return self - - @classmethod - def from_single_leaf( - cls, leaf: Bytes32, proof_hashes: ProofHashes, index: GeneralizedIndex - ) -> MerkleProof: - """Creates a MerkleProof object from a traditional single-item proof.""" - return cls(leaves=[leaf], proof_hashes=proof_hashes, indices=[index]) - - def _get_helper_indices(self) -> list[GeneralizedIndex]: - """ - Calculates the generalized indices of all "helper" nodes needed to prove the leaves. - - This is an internal helper method. - """ - all_helper_indices: set[GeneralizedIndex] = set() - all_path_indices: set[GeneralizedIndex] = set() - - for index in self.indices: - all_helper_indices.update(index.get_branch_indices()) - all_path_indices.update(index.get_path_indices()) - - return sorted(all_helper_indices - all_path_indices, key=lambda g: g.value, reverse=True) - - def calculate_root(self) -> Root: - """ - Calculates the Merkle root from the proof's leaves and helper nodes. - - Handles both single and multi-leaf proofs seamlessly. - """ - # For a single leaf proof, use the more direct calculation. - if len(self.indices) == 1: - index = self.indices[0] - leaf = self.leaves[0] - if len(self.proof_hashes) != index.depth: - raise ValueError("Proof length must match the depth of the index.") - - root = leaf - for i, branch_node in enumerate(self.proof_hashes): - if index.get_bit(i): - root = hash_nodes(branch_node, root) - else: - root = hash_nodes(root, branch_node) - return root - - # For multi-leaf proofs, perform tree reconstruction. - helper_indices = self._get_helper_indices() - if len(self.proof_hashes) != len(helper_indices): - raise ValueError("Proof length does not match the required number of helper nodes.") - - # 1. Start with the known nodes (leaves and proof hashes). - tree: dict[int, Bytes32] = { - **{index.value: node for index, node in zip(self.indices, self.leaves, strict=False)}, - **{ - index.value: node - for index, node in zip(helper_indices, self.proof_hashes, strict=False) - }, - } - - # 2. Process nodes from deepest to shallowest. - # The list of keys will grow as we create new parent nodes. - keys = sorted(tree.keys(), reverse=True) - pos = 0 - while pos < len(keys): - key = keys[pos] - sibling_key = key ^ 1 - - # 3. If a node's sibling is also in the tree, we can create their parent. - if sibling_key in tree: - parent_key = key // 2 - - # Ensure we don't re-calculate a parent we already have. - if parent_key not in tree: - # The order of hashing depends on which key is smaller. - if key < sibling_key: - tree[parent_key] = hash_nodes(tree[key], tree[sibling_key]) - else: - tree[parent_key] = hash_nodes(tree[sibling_key], tree[key]) - keys.append(parent_key) - pos += 1 - - # 4. After processing all nodes, the root must be at index 1. - if 1 not in tree: - # This can happen if the proof is incomplete or for an empty leaf set. - return ZERO_HASH - - return tree[1] - - def verify(self, root: Root) -> bool: - """Verifies the Merkle proof against a known root.""" - try: - return self.calculate_root() == root - except ValueError: - return False diff --git a/src/lean_spec/subspecs/ssz/merkle_proof/tree.py b/src/lean_spec/subspecs/ssz/merkle_proof/tree.py deleted file mode 100644 index 8206cc5a..00000000 --- a/src/lean_spec/subspecs/ssz/merkle_proof/tree.py +++ /dev/null @@ -1,43 +0,0 @@ -"""Merkle tree building logic.""" - -from typing import Sequence - -from lean_spec.subspecs.ssz.utils import get_power_of_two_ceil, hash_nodes -from lean_spec.types import ZERO_HASH -from lean_spec.types.byte_arrays import Bytes32 - - -def build_merkle_tree(leaves: Sequence[Bytes32]) -> list[Bytes32]: - r""" - Builds a full Merkle tree and returns it as a flat list. - - The tree is represented as a list where the node at a generalized - index `i` is located at `tree[i]`. The 0-index is a placeholder. - """ - # Handle the edge case of no leaves. - if not leaves: - # Per the spec, a tree of an empty list is a single ZERO_HASH. - # - # The flat list format includes a placeholder at index 0. - return [ZERO_HASH] * 2 - - # Calculate the required size of the bottom layer (must be a power of two). - bottom_layer_size = get_power_of_two_ceil(len(leaves)) - - # Create the complete, padded leaf layer. - padded_leaves = list(leaves) + [ZERO_HASH] * (bottom_layer_size - len(leaves)) - - # Initialize the tree with placeholders for parent nodes and the padded leaves. - # - # The first half of the list will store the calculated parent nodes. - tree = [ZERO_HASH] * bottom_layer_size + padded_leaves - - # Iterate backwards from the last parent node up to the root. - # - # This calculates the tree from the bottom up. - for i in range(bottom_layer_size - 1, 0, -1): - # A parent at index `i` is the hash of its two children at `2*i` and `2*i+1`. - tree[i] = hash_nodes(tree[i * 2], tree[i * 2 + 1]) - - # Return the fully constructed tree. - return tree diff --git a/src/lean_spec/types/__init__.py b/src/lean_spec/types/__init__.py index cc872071..5004e14b 100644 --- a/src/lean_spec/types/__init__.py +++ b/src/lean_spec/types/__init__.py @@ -1,10 +1,9 @@ """Reusable type definitions for the Lean Ethereum specification.""" from .base import CamelModel, StrictBaseModel -from .basispt import BasisPoint from .bitfields import BaseBitlist from .boolean import Boolean -from .byte_arrays import ZERO_HASH, Bytes12, Bytes16, Bytes20, Bytes32, Bytes33, Bytes52, Bytes64 +from .byte_arrays import ZERO_HASH, Bytes16, Bytes20, Bytes32, Bytes33, Bytes52, Bytes64 from .collections import SSZList, SSZVector from .container import Container from .exceptions import ( @@ -21,8 +20,6 @@ # Core types "BaseBitlist", "Uint64", - "BasisPoint", - "Bytes12", "Bytes16", "Bytes20", "Bytes32", diff --git a/src/lean_spec/types/basispt.py b/src/lean_spec/types/basispt.py deleted file mode 100644 index eb229011..00000000 --- a/src/lean_spec/types/basispt.py +++ /dev/null @@ -1,16 +0,0 @@ -"""Basis Point Type Specification.""" - -from pydantic import Field -from typing_extensions import Annotated - -from .uint import Uint64 - -BasisPoint = Annotated[ - Uint64, - Field(le=Uint64(10000), description="A value in basis points (1/10000)."), -] -""" -A type alias for basis points. - -A basis point (bps) is 1/100th of a percent. 100% = 10,000 bps. -""" diff --git a/src/lean_spec/types/byte_arrays.py b/src/lean_spec/types/byte_arrays.py index a90ada39..70e55173 100644 --- a/src/lean_spec/types/byte_arrays.py +++ b/src/lean_spec/types/byte_arrays.py @@ -209,12 +209,6 @@ class Bytes8(BaseBytes): LENGTH = 8 -class Bytes12(BaseBytes): - """Fixed-size byte array of exactly 12 bytes (ChaCha20-Poly1305 nonce).""" - - LENGTH = 12 - - class Bytes16(BaseBytes): """Fixed-size byte array of exactly 16 bytes (Poly1305 authentication tag).""" diff --git a/tests/lean_spec/subspecs/metrics/test_registry.py b/tests/lean_spec/subspecs/metrics/test_registry.py index 1b90b6bc..fbea11e8 100644 --- a/tests/lean_spec/subspecs/metrics/test_registry.py +++ b/tests/lean_spec/subspecs/metrics/test_registry.py @@ -4,20 +4,14 @@ from lean_spec.subspecs.metrics import ( REGISTRY, - attestations_invalid, attestations_produced, - attestations_received, - attestations_valid, block_processing_time, blocks_processed, blocks_proposed, - current_slot, finalized_slot, generate_metrics, head_slot, justified_slot, - peers_connected, - reorgs, validators_count, ) @@ -59,7 +53,6 @@ class TestMetricDefinitions: def test_node_information_gauges_exist(self) -> None: """Node information gauges are defined.""" assert head_slot is not None - assert current_slot is not None assert justified_slot is not None assert finalized_slot is not None assert validators_count is not None @@ -69,20 +62,6 @@ def test_block_processing_metrics_exist(self) -> None: assert blocks_processed is not None assert block_processing_time is not None - def test_attestation_metrics_exist(self) -> None: - """Attestation metrics are defined.""" - assert attestations_received is not None - assert attestations_valid is not None - assert attestations_invalid is not None - - def test_network_metrics_exist(self) -> None: - """Network metrics are defined.""" - assert peers_connected is not None - - def test_consensus_event_metrics_exist(self) -> None: - """Consensus event metrics are defined.""" - assert reorgs is not None - def test_validator_production_metrics_exist(self) -> None: """Validator production metrics are defined.""" assert blocks_proposed is not None @@ -104,8 +83,6 @@ def test_output_contains_metric_names(self) -> None: assert "lean_head_slot" in output assert "lean_blocks_processed_total" in output assert "lean_block_processing_seconds" in output - assert "lean_attestations_received_total" in output - assert "lean_peers_connected" in output def test_output_contains_help_text(self) -> None: """Output contains HELP lines for metrics.""" diff --git a/tests/lean_spec/subspecs/networking/test_discovery.py b/tests/lean_spec/subspecs/networking/test_discovery.py index 6c734499..66ebad5c 100644 --- a/tests/lean_spec/subspecs/networking/test_discovery.py +++ b/tests/lean_spec/subspecs/networking/test_discovery.py @@ -10,22 +10,15 @@ Distance, FindNode, IdNonce, - IPv4, - IPv6, - KBucket, MessageType, Nodes, Nonce, - PacketFlag, Ping, Pong, - Port, RequestId, RoutingTable, - StaticHeader, TalkReq, TalkResp, - WhoAreYouAuthdata, ) from lean_spec.subspecs.networking.discovery.config import ( ALPHA, @@ -38,7 +31,16 @@ MIN_PACKET_SIZE, REQUEST_TIMEOUT_SECS, ) +from lean_spec.subspecs.networking.discovery.messages import ( + IPv4, + IPv6, + PacketFlag, + Port, + StaticHeader, + WhoAreYouAuthdata, +) from lean_spec.subspecs.networking.discovery.routing import ( + KBucket, NodeEntry, log2_distance, xor_distance, diff --git a/tests/lean_spec/subspecs/networking/test_gossipsub.py b/tests/lean_spec/subspecs/networking/test_gossipsub.py index eafbe787..4b793239 100644 --- a/tests/lean_spec/subspecs/networking/test_gossipsub.py +++ b/tests/lean_spec/subspecs/networking/test_gossipsub.py @@ -9,7 +9,6 @@ ) from lean_spec.subspecs.networking.gossipsub import ( ControlMessage, - FanoutEntry, ForkMismatchError, GossipsubMessage, GossipsubParameters, @@ -18,15 +17,13 @@ IDontWant, IHave, IWant, - MeshState, - MessageCache, Prune, - SeenCache, TopicKind, - TopicMesh, format_topic_string, parse_topic_string, ) +from lean_spec.subspecs.networking.gossipsub.mcache import MessageCache, SeenCache +from lean_spec.subspecs.networking.gossipsub.mesh import FanoutEntry, MeshState, TopicMesh def peer(name: str) -> PeerId: @@ -502,7 +499,7 @@ class TestRPCProtobufEncoding: def test_varint_encoding(self) -> None: """Test varint encoding matches protobuf spec.""" - from lean_spec.subspecs.networking.gossipsub import encode_varint + from lean_spec.subspecs.networking.varint import encode_varint # Single byte varints (0-127) assert encode_varint(0) == b"\x00" @@ -519,7 +516,7 @@ def test_varint_encoding(self) -> None: def test_varint_decoding(self) -> None: """Test varint decoding matches protobuf spec.""" - from lean_spec.subspecs.networking.gossipsub import decode_varint + from lean_spec.subspecs.networking.varint import decode_varint # Single byte value, pos = decode_varint(b"\x00", 0) @@ -541,7 +538,7 @@ def test_varint_decoding(self) -> None: def test_varint_roundtrip(self) -> None: """Test varint encode/decode roundtrip.""" - from lean_spec.subspecs.networking.gossipsub import decode_varint, encode_varint + from lean_spec.subspecs.networking.varint import decode_varint, encode_varint test_values = [0, 1, 127, 128, 255, 256, 16383, 16384, 2097151, 268435455] for value in test_values: @@ -551,7 +548,7 @@ def test_varint_roundtrip(self) -> None: def test_subopts_encode_decode(self) -> None: """Test SubOpts (subscription) encoding/decoding.""" - from lean_spec.subspecs.networking.gossipsub import SubOpts + from lean_spec.subspecs.networking.gossipsub.rpc import SubOpts # Subscribe sub = SubOpts(subscribe=True, topic_id="/leanconsensus/0x12345678/block/ssz_snappy") @@ -571,7 +568,7 @@ def test_subopts_encode_decode(self) -> None: def test_message_encode_decode(self) -> None: """Test Message encoding/decoding.""" - from lean_spec.subspecs.networking.gossipsub import RPCMessage + from lean_spec.subspecs.networking.gossipsub.rpc import Message as RPCMessage msg = RPCMessage( from_peer=b"peer123", @@ -593,7 +590,7 @@ def test_message_encode_decode(self) -> None: def test_message_minimal(self) -> None: """Test Message with only required fields.""" - from lean_spec.subspecs.networking.gossipsub import RPCMessage + from lean_spec.subspecs.networking.gossipsub.rpc import Message as RPCMessage msg = RPCMessage(topic="/test/topic", data=b"payload") encoded = msg.encode() @@ -606,7 +603,7 @@ def test_message_minimal(self) -> None: def test_control_graft_encode_decode(self) -> None: """Test ControlGraft encoding/decoding.""" - from lean_spec.subspecs.networking.gossipsub import RPCControlGraft + from lean_spec.subspecs.networking.gossipsub.rpc import ControlGraft as RPCControlGraft graft = RPCControlGraft(topic_id="/test/blocks") encoded = graft.encode() @@ -616,7 +613,7 @@ def test_control_graft_encode_decode(self) -> None: def test_control_prune_encode_decode(self) -> None: """Test ControlPrune encoding/decoding with backoff.""" - from lean_spec.subspecs.networking.gossipsub import RPCControlPrune + from lean_spec.subspecs.networking.gossipsub.rpc import ControlPrune as RPCControlPrune prune = RPCControlPrune(topic_id="/test/blocks", backoff=60) encoded = prune.encode() @@ -627,7 +624,7 @@ def test_control_prune_encode_decode(self) -> None: def test_control_ihave_encode_decode(self) -> None: """Test ControlIHave encoding/decoding.""" - from lean_spec.subspecs.networking.gossipsub import RPCControlIHave + from lean_spec.subspecs.networking.gossipsub.rpc import ControlIHave as RPCControlIHave msg_ids = [b"msgid1234567890ab", b"msgid2345678901bc", b"msgid3456789012cd"] ihave = RPCControlIHave(topic_id="/test/blocks", message_ids=msg_ids) @@ -639,7 +636,7 @@ def test_control_ihave_encode_decode(self) -> None: def test_control_iwant_encode_decode(self) -> None: """Test ControlIWant encoding/decoding.""" - from lean_spec.subspecs.networking.gossipsub import RPCControlIWant + from lean_spec.subspecs.networking.gossipsub.rpc import ControlIWant as RPCControlIWant msg_ids = [b"msgid1234567890ab", b"msgid2345678901bc"] iwant = RPCControlIWant(message_ids=msg_ids) @@ -650,7 +647,9 @@ def test_control_iwant_encode_decode(self) -> None: def test_control_idontwant_encode_decode(self) -> None: """Test ControlIDontWant encoding/decoding (v1.2).""" - from lean_spec.subspecs.networking.gossipsub import RPCControlIDontWant + from lean_spec.subspecs.networking.gossipsub.rpc import ( + ControlIDontWant as RPCControlIDontWant, + ) msg_ids = [b"msgid1234567890ab"] idontwant = RPCControlIDontWant(message_ids=msg_ids) @@ -661,11 +660,17 @@ def test_control_idontwant_encode_decode(self) -> None: def test_control_message_aggregate(self) -> None: """Test ControlMessage with multiple control types.""" - from lean_spec.subspecs.networking.gossipsub import ( - RPCControlGraft, - RPCControlIHave, - RPCControlMessage, - RPCControlPrune, + from lean_spec.subspecs.networking.gossipsub.rpc import ( + ControlGraft as RPCControlGraft, + ) + from lean_spec.subspecs.networking.gossipsub.rpc import ( + ControlIHave as RPCControlIHave, + ) + from lean_spec.subspecs.networking.gossipsub.rpc import ( + ControlMessage as RPCControlMessage, + ) + from lean_spec.subspecs.networking.gossipsub.rpc import ( + ControlPrune as RPCControlPrune, ) ctrl = RPCControlMessage( @@ -686,7 +691,7 @@ def test_control_message_aggregate(self) -> None: def test_rpc_subscription_only(self) -> None: """Test RPC with only subscriptions.""" - from lean_spec.subspecs.networking.gossipsub import RPC, SubOpts + from lean_spec.subspecs.networking.gossipsub.rpc import RPC, SubOpts rpc = RPC( subscriptions=[ @@ -705,7 +710,8 @@ def test_rpc_subscription_only(self) -> None: def test_rpc_publish_only(self) -> None: """Test RPC with only published messages.""" - from lean_spec.subspecs.networking.gossipsub import RPC, RPCMessage + from lean_spec.subspecs.networking.gossipsub.rpc import RPC + from lean_spec.subspecs.networking.gossipsub.rpc import Message as RPCMessage rpc = RPC( publish=[ @@ -723,10 +729,14 @@ def test_rpc_publish_only(self) -> None: def test_rpc_control_only(self) -> None: """Test RPC with only control messages.""" - from lean_spec.subspecs.networking.gossipsub import ( + from lean_spec.subspecs.networking.gossipsub.rpc import ( RPC, - RPCControlGraft, - RPCControlMessage, + ) + from lean_spec.subspecs.networking.gossipsub.rpc import ( + ControlGraft as RPCControlGraft, + ) + from lean_spec.subspecs.networking.gossipsub.rpc import ( + ControlMessage as RPCControlMessage, ) rpc = RPC(control=RPCControlMessage(graft=[RPCControlGraft(topic_id="/blocks")])) @@ -739,14 +749,22 @@ def test_rpc_control_only(self) -> None: def test_rpc_full_message(self) -> None: """Test RPC with all message types (full gossipsub exchange).""" - from lean_spec.subspecs.networking.gossipsub import ( + from lean_spec.subspecs.networking.gossipsub.rpc import ( RPC, - RPCControlGraft, - RPCControlIHave, - RPCControlMessage, - RPCMessage, SubOpts, ) + from lean_spec.subspecs.networking.gossipsub.rpc import ( + ControlGraft as RPCControlGraft, + ) + from lean_spec.subspecs.networking.gossipsub.rpc import ( + ControlIHave as RPCControlIHave, + ) + from lean_spec.subspecs.networking.gossipsub.rpc import ( + ControlMessage as RPCControlMessage, + ) + from lean_spec.subspecs.networking.gossipsub.rpc import ( + Message as RPCMessage, + ) rpc = RPC( subscriptions=[SubOpts(subscribe=True, topic_id="/blocks")], @@ -772,7 +790,7 @@ def test_rpc_full_message(self) -> None: def test_rpc_empty_check(self) -> None: """Test RPC is_empty method.""" - from lean_spec.subspecs.networking.gossipsub import RPC, SubOpts + from lean_spec.subspecs.networking.gossipsub.rpc import RPC, SubOpts empty_rpc = RPC() assert empty_rpc.is_empty() @@ -782,7 +800,7 @@ def test_rpc_empty_check(self) -> None: def test_rpc_helper_functions(self) -> None: """Test RPC creation helper functions.""" - from lean_spec.subspecs.networking.gossipsub import ( + from lean_spec.subspecs.networking.gossipsub.rpc import ( create_graft_rpc, create_ihave_rpc, create_iwant_rpc, @@ -829,7 +847,7 @@ def test_wire_format_compatibility(self) -> None: This test verifies that our encoding produces the same bytes as a reference implementation would for simple cases. """ - from lean_spec.subspecs.networking.gossipsub import RPC, SubOpts + from lean_spec.subspecs.networking.gossipsub.rpc import RPC, SubOpts # A subscription RPC with a simple topic rpc = RPC(subscriptions=[SubOpts(subscribe=True, topic_id="test")]) @@ -847,7 +865,8 @@ def test_wire_format_compatibility(self) -> None: def test_large_message_encoding(self) -> None: """Test encoding of large messages (typical block size).""" - from lean_spec.subspecs.networking.gossipsub import RPC, RPCMessage + from lean_spec.subspecs.networking.gossipsub.rpc import RPC + from lean_spec.subspecs.networking.gossipsub.rpc import Message as RPCMessage # Simulate a large block payload (100KB) large_data = b"x" * 100_000 diff --git a/tests/lean_spec/subspecs/ssz/test_gindex.py b/tests/lean_spec/subspecs/ssz/test_gindex.py deleted file mode 100644 index a73fa885..00000000 --- a/tests/lean_spec/subspecs/ssz/test_gindex.py +++ /dev/null @@ -1,186 +0,0 @@ -"""Tests for the GeneralizedIndex class.""" - -import pytest -from pydantic import ValidationError -from typing_extensions import Any - -from lean_spec.subspecs.ssz.merkle_proof.gindex import GeneralizedIndex - - -def test_pydantic_validation_accepts_valid_int() -> None: - """Tests that Pydantic validation correctly accepts a valid positive integer.""" - instance = GeneralizedIndex(value=10) - assert isinstance(instance, GeneralizedIndex) - assert instance.value == 10 - - -@pytest.mark.parametrize("invalid_value", [0, -1, -100]) -def test_pydantic_validation_rejects_non_positive_int(invalid_value: int) -> None: - """Tests that Pydantic validation rejects zero and negative integers.""" - with pytest.raises(ValidationError): - GeneralizedIndex(value=invalid_value) - - -@pytest.mark.parametrize( - "invalid_type", - [1.0, "1", True, False, b"1", None], -) -def test_pydantic_validation_rejects_invalid_types(invalid_type: Any) -> None: - """Tests that Pydantic's strict integer validation rejects non-integer types.""" - with pytest.raises(ValidationError): - GeneralizedIndex(value=invalid_type) - - -@pytest.mark.parametrize( - "value, expected_depth", - [ - (1, 0), - (2, 1), - (3, 1), - (4, 2), - (5, 2), - (6, 2), - (7, 2), - (13, 3), - (31, 4), - ], -) -def test_depth_property(value: int, expected_depth: int) -> None: - """Tests that the `depth` property is calculated correctly.""" - index = GeneralizedIndex(value=value) - assert index.depth == expected_depth - - -@pytest.mark.parametrize( - "index_value, bit_position, expected_bool", - [ - (13, 0, True), # 1101 -> bit 0 is 1 - (13, 1, False), # 1101 -> bit 1 is 0 - (13, 2, True), # 1101 -> bit 2 is 1 - (13, 3, True), # 1101 -> bit 3 is 1 - (13, 4, False), # Out of bounds - (1, 0, True), - (1, 1, False), - ], -) -def test_get_bit_method(index_value: int, bit_position: int, expected_bool: bool) -> None: - """Tests the `get_bit` method for various positions.""" - index = GeneralizedIndex(value=index_value) - assert index.get_bit(bit_position) is expected_bool - - -@pytest.mark.parametrize( - "value, expected_sibling_value", - [ - (2, 3), # Left node - (3, 2), # Right node - (10, 11), # Left node - (11, 10), # Right node - ], -) -def test_sibling_property(value: int, expected_sibling_value: int) -> None: - """Tests that the `sibling` property correctly flips the last bit.""" - index = GeneralizedIndex(value=value) - sibling = index.sibling - assert isinstance(sibling, GeneralizedIndex) - assert sibling.value == expected_sibling_value - - -def test_sibling_of_root_is_invalid() -> None: - """ - Tests that getting the sibling of the root (value=1) produces a GeneralizedIndex - with value=0, which will fail Pydantic's validation upon creation. - """ - with pytest.raises(ValidationError, match="Input should be greater than 0"): - _ = GeneralizedIndex(value=1).sibling - - -@pytest.mark.parametrize( - "value, expected_parent_value", - [ - (2, 1), - (3, 1), - (10, 5), - (11, 5), - (15, 7), - ], -) -def test_parent_property(value: int, expected_parent_value: int) -> None: - """Tests that the `parent` property is calculated correctly.""" - index = GeneralizedIndex(value=value) - parent = index.parent - assert isinstance(parent, GeneralizedIndex) - assert parent.value == expected_parent_value - - -def test_parent_of_root_raises_error() -> None: - """Tests that getting the parent of the root node (value=1) raises a ValueError.""" - root = GeneralizedIndex(value=1) - with pytest.raises(ValueError, match="Root node has no parent."): - _ = root.parent - - -@pytest.mark.parametrize( - "value, expected_indices_values", - [ - (13, [12, 7, 2]), # Path: 13 -> 6 -> 3 -> 1 - (8, [9, 5, 3]), # Path: 8 -> 4 -> 2 -> 1 - (15, [14, 6, 2]), # Path: 15 -> 7 -> 3 -> 1 - (2, [3]), - (3, [2]), - (1, []), # Root node has no branch - ], -) -def test_get_branch_indices(value: int, expected_indices_values: list[int]) -> None: - """Tests the `get_branch_indices` method.""" - index = GeneralizedIndex(value=value) - branch_indices = index.get_branch_indices() - - # Extract the integer values from the result for comparison - result_values = [idx.value for idx in branch_indices] - - assert result_values == expected_indices_values - - -@pytest.mark.parametrize( - "value, expected_indices_values", - [ - (13, [13, 6, 3]), - (8, [8, 4, 2]), - (15, [15, 7, 3]), - (2, [2]), - (1, []), # Root node has no path (excluding itself) - ], -) -def test_get_path_indices(value: int, expected_indices_values: list[int]) -> None: - """Tests the `get_path_indices` method.""" - index = GeneralizedIndex(value=value) - path_indices = index.get_path_indices() - - # Extract the integer values from the result for comparison - result_values = [idx.value for idx in path_indices] - - assert result_values == expected_indices_values - - -@pytest.mark.parametrize( - "start_value, right_side, expected_child_value", - [ - # Children of the root (node 1) - (1, False, 2), - (1, True, 3), - # Children of a deeper left node (node 6) - (6, False, 12), - (6, True, 13), - # Children of a deeper right node (node 7) - (7, False, 14), - (7, True, 15), - ], -) -def test_child_method(start_value: int, right_side: bool, expected_child_value: int) -> None: - """Tests the `child` method for both left and right children.""" - parent_index = GeneralizedIndex(value=start_value) - child_index = parent_index.child(right_side=right_side) - - assert isinstance(child_index, GeneralizedIndex) - assert child_index.value == expected_child_value diff --git a/tests/lean_spec/subspecs/ssz/test_merkle_proof.py b/tests/lean_spec/subspecs/ssz/test_merkle_proof.py deleted file mode 100644 index 169ab661..00000000 --- a/tests/lean_spec/subspecs/ssz/test_merkle_proof.py +++ /dev/null @@ -1,148 +0,0 @@ -"""Tests for the SSZ merkle proofs.""" - -import os -from typing import Dict - -import pytest -from typing_extensions import Any - -from lean_spec.subspecs.ssz.merkle_proof.gindex import GeneralizedIndex -from lean_spec.subspecs.ssz.merkle_proof.proof import MerkleProof, ProofHashes -from lean_spec.subspecs.ssz.merkle_proof.tree import build_merkle_tree -from lean_spec.subspecs.ssz.utils import get_power_of_two_ceil -from lean_spec.types.byte_arrays import Bytes32 - - -@pytest.fixture -def sample_tree_data() -> Dict[str, Any]: - """Provides a pre-computed Merkle tree and its components.""" - leaves = [Bytes32(os.urandom(32)) for _ in range(11)] - tree = build_merkle_tree(leaves) - root = tree[1] - bottom_layer_start_index = get_power_of_two_ceil(len(leaves)) - - return { - "leaves": leaves, - "tree": tree, - "root": root, - "bottom_layer_start_index": bottom_layer_start_index, - } - - -def test_merkle_proof_instantiation_valid(sample_tree_data: Dict[str, Any]) -> None: - """Tests that a MerkleProof can be created with valid data.""" - leaf = sample_tree_data["leaves"][0] - index = GeneralizedIndex(value=sample_tree_data["bottom_layer_start_index"]) - proof = MerkleProof(leaves=[leaf], indices=[index], proof_hashes=[]) - assert proof.leaves == [leaf] - - -def test_merkle_proof_instantiation_mismatched_lengths() -> None: - """Tests that instantiation fails if leaves and indices have different lengths.""" - leaf = Bytes32(os.urandom(32)) - index1 = GeneralizedIndex(value=8) - index2 = GeneralizedIndex(value=9) - with pytest.raises(ValueError, match="The number of leaves must match the number of indices."): - MerkleProof(leaves=[leaf], indices=[index1, index2], proof_hashes=[]) - - -def test_from_single_leaf_factory(sample_tree_data: Dict[str, Any]) -> None: - """Tests the `from_single_leaf` class method factory.""" - leaf = sample_tree_data["leaves"][0] - index = GeneralizedIndex(value=sample_tree_data["bottom_layer_start_index"]) - proof_hashes = [Bytes32(os.urandom(32))] * index.depth - - proof = MerkleProof.from_single_leaf(leaf, proof_hashes, index) - assert proof.leaves == [leaf] - assert proof.indices == [index] - assert proof.proof_hashes == proof_hashes - - -@pytest.mark.parametrize("leaf_index_to_test", [0, 3, 10]) -def test_single_leaf_proof_verification( - sample_tree_data: Dict[str, Any], leaf_index_to_test: int -) -> None: - """Tests calculation and verification of a valid single-leaf proof.""" - tree = sample_tree_data["tree"] - root = sample_tree_data["root"] - gindex = GeneralizedIndex( - value=sample_tree_data["bottom_layer_start_index"] + leaf_index_to_test - ) - leaf = sample_tree_data["leaves"][leaf_index_to_test] - - branch_indices = gindex.get_branch_indices() - proof_hashes = [tree[i.value] for i in branch_indices] - - proof = MerkleProof.from_single_leaf(leaf, proof_hashes, gindex) - - assert proof.calculate_root() == root - assert proof.verify(root) is True - assert proof.verify(Bytes32(os.urandom(32))) is False - - -def test_single_leaf_invalid_proof_length(sample_tree_data: Dict[str, Any]) -> None: - """Tests that verification fails for a proof with an incorrect length.""" - root = sample_tree_data["root"] - gindex = GeneralizedIndex(value=sample_tree_data["bottom_layer_start_index"] + 2) - leaf = sample_tree_data["leaves"][2] - - # Create a proof that is too short - proof_hashes = [Bytes32(os.urandom(32))] * (gindex.depth - 1) - proof = MerkleProof.from_single_leaf(leaf, proof_hashes, gindex) - - assert proof.verify(root) is False - with pytest.raises(ValueError, match="Proof length must match the depth of the index."): - proof.calculate_root() - - -@pytest.mark.parametrize( - "leaf_indices_to_test", - [ - [2, 3], # Siblings - [5, 6], # Adjacent cousins - [0, 7], # Distant cousins - [1, 5, 10], # Three distant leaves - ], -) -def test_multi_leaf_proof_verification( - sample_tree_data: Dict[str, Any], leaf_indices_to_test: list[int] -) -> None: - """Tests calculation and verification of valid multi-leaf proofs.""" - tree = sample_tree_data["tree"] - root = sample_tree_data["root"] - leaves = [sample_tree_data["leaves"][i] for i in leaf_indices_to_test] - indices = [ - GeneralizedIndex(value=sample_tree_data["bottom_layer_start_index"] + i) - for i in leaf_indices_to_test - ] - - # Create a temporary proof object just to calculate the helper indices - temp_proof = MerkleProof(leaves=leaves, indices=indices, proof_hashes=[]) - helper_indices = temp_proof._get_helper_indices() - proof_hashes = [tree[i.value] for i in helper_indices] - - # Create the final, valid proof object - proof = MerkleProof(leaves=leaves, indices=indices, proof_hashes=proof_hashes) - - assert proof.calculate_root() == root - assert proof.verify(root) is True - assert proof.verify(Bytes32(os.urandom(32))) is False - - -def test_multi_leaf_invalid_proof_length(sample_tree_data: Dict[str, Any]) -> None: - """Tests that multi-proof verification fails if the number of proof hashes is wrong.""" - root = sample_tree_data["root"] - leaves = [sample_tree_data["leaves"][i] for i in [2, 3]] - indices = [ - GeneralizedIndex(value=sample_tree_data["bottom_layer_start_index"] + i) for i in [2, 3] - ] - - # Provide a proof with one too few hashes - proof_hashes: ProofHashes = [] - proof = MerkleProof(leaves=leaves, indices=indices, proof_hashes=proof_hashes) - - assert proof.verify(root) is False - with pytest.raises( - ValueError, match="Proof length does not match the required number of helper nodes." - ): - proof.calculate_root() diff --git a/tests/lean_spec/subspecs/ssz/test_tree.py b/tests/lean_spec/subspecs/ssz/test_tree.py deleted file mode 100644 index 55777a50..00000000 --- a/tests/lean_spec/subspecs/ssz/test_tree.py +++ /dev/null @@ -1,108 +0,0 @@ -"""Tests for the build_merkle_tree function.""" - -import os - -import pytest -from typing_extensions import List - -from lean_spec.subspecs.ssz.merkle_proof.tree import build_merkle_tree -from lean_spec.subspecs.ssz.utils import hash_nodes -from lean_spec.types import ZERO_HASH -from lean_spec.types.byte_arrays import Bytes32 - -# Create some deterministic leaves for predictable results -LEAF_A = Bytes32(b"\xaa" * 32) -LEAF_B = Bytes32(b"\xbb" * 32) -LEAF_C = Bytes32(b"\xcc" * 32) -LEAF_D = Bytes32(b"\xdd" * 32) - - -def test_build_merkle_tree_empty() -> None: - """ - Tests that building a tree with no leaves returns two zero hashes. - - This corresponds to a tree with a single zero leaf, resulting in a zero root. - """ - expected_tree = [ZERO_HASH, ZERO_HASH] - assert build_merkle_tree([]) == expected_tree - - -def test_build_merkle_tree_single_leaf() -> None: - """ - Tests that a tree with a single leaf has the leaf as its root. - Note: The 0-index is a placeholder. - """ - # Assuming the placeholder at index 0 is ZERO_HASH, not Bytes32.zero() - expected_tree = [ZERO_HASH, LEAF_A] - assert build_merkle_tree([LEAF_A]) == expected_tree - - -def test_build_merkle_tree_two_leaves() -> None: - """ - Tests a perfectly balanced tree with two leaves (a power of 2). - """ - root = hash_nodes(LEAF_A, LEAF_B) - expected_tree = [ - ZERO_HASH, # Placeholder at index 0 - root, # Root at index 1 - LEAF_A, # Leaf at index 2 - LEAF_B, # Leaf at index 3 - ] - assert build_merkle_tree([LEAF_A, LEAF_B]) == expected_tree - - -def test_build_merkle_tree_three_leaves() -> None: - """ - Tests a tree with a number of leaves that is not a power of 2, - requiring padding with a ZERO_HASH. - """ - # Bottom layer: [LEAF_A, LEAF_B, LEAF_C, ZERO_HASH] - parent_ab = hash_nodes(LEAF_A, LEAF_B) - parent_c_zero = hash_nodes(LEAF_C, ZERO_HASH) - root = hash_nodes(parent_ab, parent_c_zero) - - expected_tree = [ - ZERO_HASH, # Placeholder at index 0 - root, # Root at index 1 - parent_ab, # Parent at index 2 - parent_c_zero, # Parent at index 3 - LEAF_A, - LEAF_B, - LEAF_C, - ZERO_HASH, # Padding at index 7 - ] - assert build_merkle_tree([LEAF_A, LEAF_B, LEAF_C]) == expected_tree - - -@pytest.mark.parametrize("num_leaves", [5, 11, 16]) -def test_build_merkle_tree_larger_trees(num_leaves: int) -> None: - """ - Tests larger trees by verifying the root and structure without - checking every intermediate node by hand. - """ - leaves = [Bytes32(os.urandom(32)) for _ in range(num_leaves)] - tree = build_merkle_tree(leaves) - - # Check tree length - # - # A full tree is twice the size of its padded bottom layer. - expected_bottom_size = 1 - while expected_bottom_size < num_leaves: - expected_bottom_size *= 2 - assert len(tree) == expected_bottom_size * 2 - - # Check that the original leaves are correctly placed - assert tree[expected_bottom_size : expected_bottom_size + num_leaves] == leaves - - # Manually re-calculate and verify the root - # - # This ensures the hashing logic is correct throughout the tree. - layer = list(leaves) + [ZERO_HASH] * (expected_bottom_size - num_leaves) - while len(layer) > 1: - new_layer: List[Bytes32] = [] - for i in range(0, len(layer), 2): - new_layer.append(hash_nodes(layer[i], layer[i + 1])) - layer = new_layer - - expected_root = layer[0] - assert tree[1] == expected_root