Skip to content

Use Sessions to send messages#583

Merged
jeandemeusy merged 126 commits intomainfrom
jean/hoprd-2-2-compliance
May 12, 2025
Merged

Use Sessions to send messages#583
jeandemeusy merged 126 commits intomainfrom
jean/hoprd-2-2-compliance

Conversation

@jeandemeusy
Copy link
Copy Markdown
Collaborator

@jeandemeusy jeandemeusy commented Nov 19, 2024

HOPRd 2.2 introduced some changes that are worth taking advantage of.

The main one is the introduction of sessions.
CT now opens a session from each CT node to each eligible peer, and "throws" packet in the socket defined by the port returned by the Session endpoint.
To be able to send data to each peer using its own session/socket-port, a dictionary of {peer_id:socket} is managed by each node.
When sending messages, if an entry in the dictionary exist for the target peer, it uses this associated socket, otherwise initiate a new session and automatically create a socket on the returned port. Then, messages are sent as bytes through the socket.

A second minor feature is the introcution of the win_probability endpoint. Instead of using a predefined value from the config file (set to 1 since.. the start of CT), it now retrieves this value from the CT nodes through the API in the same way the ticket price is retrieved.

Also, a more automated way to (de)serialize API objects has been added, since the CT is not using HOPRd SDK anymore.

Summary by CodeRabbit

  • New Features

    • Introduced session-based communication enabling creation, retrieval, and closure of TCP/UDP sessions.
    • Added a new socket communication layer for session-based message sending and receiving.
    • Added scripts for testing session lifecycle and closing all active sessions asynchronously.
    • Added pattern matching utility for flexible regex-based string parsing.
  • Enhancements

    • Refined message formatting with size enforcement and enhanced message relay logic.
    • Improved node initialization, session lifecycle management, and cleanup procedures.
    • Expanded configuration options for session management and message observation.
    • Added dynamic discovery of decorated tasks for better task management.
    • Updated protocol handling with explicit retransmission and segmentation properties.
    • Restructured API request and response models to support session operations.
    • Enhanced logging consistency and added operational metrics for sessions and channels.
    • Updated test configurations and removed obsolete test cases for streamlined testing.
    • Improved asynchronous task gathering with flexible methods.
    • Simplified logging output formatting for clarity.
  • Bug Fixes

    • Fixed type handling in configuration parsing to prevent errors with price values.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🔭 Outside diff range comments (1)
ct-app/core/components/session_to_socket.py (1)

102-129: 🛠️ Refactor suggestion

Add error handling for socket operations in send_and_receive

The send_and_receive method lacks comprehensive error handling for socket operations.

Enhance error handling to capture and log potential exceptions:

    async def send_and_receive(self, message: MessageFormat) -> float:
        # TODO: maybe set the timestamp here ?

-        sent_size = self.send(message.bytes())
-        recv_message, recv_size, timestamp = self.receive(sent_size)
+        try:
+            sent_size = self.send(message.bytes())
+            recv_message, recv_size, timestamp = self.receive(sent_size)
+        except socket.error as e:
+            # Log the error but don't propagate it to avoid disrupting the message flow
+            import logging
+            logging.error(f"Socket error during send/receive: {e}")
+            return 0

        if recv_message is None:
            return 0

        try:
            message = MessageFormat.parse(recv_message)
        except ValueError:
            return 0

        rtt = (timestamp - message.timestamp) / 1000

        # convert to number of messages instead of bytes
        sent_count = sent_size / message.multiplier
        recv_count = recv_size / message.multiplier

-        MESSAGES_STATS.labels("sent", message.sender, message.relayer).inc(sent_count)
-        MESSAGES_STATS.labels("relayed", message.sender, message.relayer).inc(
-            recv_count
-        )
-        MESSAGES_DELAYS.labels(message.sender, message.relayer).observe(rtt)
+        try:
+            MESSAGES_STATS.labels("sent", message.sender, message.relayer).inc(sent_count)
+            MESSAGES_STATS.labels("relayed", message.sender, message.relayer).inc(
+                recv_count
+            )
+            MESSAGES_DELAYS.labels(message.sender, message.relayer).observe(rtt)
+        except Exception as e:
+            # Don't let metrics failures disrupt the message flow
+            import logging
+            logging.error(f"Error updating metrics: {e}")

        return recv_size / sent_size
♻️ Duplicate comments (6)
ct-app/core/api/protocol.py (3)

24-32: 🛠️ Refactor suggestion

Improve error handling in fromString method

The error handling in this method doesn't use proper exception chaining, which makes debugging more difficult.

Use exception chaining to preserve the original error context:

    @classmethod
-    def fromString(cls, protocol: str):
+    def fromString(cls, protocol: str) -> 'Protocol':
         try:
             return getattr(cls, protocol.upper())
         except AttributeError as err:
             raise ValueError(
                 f"Invalid protocol: {protocol}. Valid values are: {[p.name for p in cls]}"
-            )
+            ) from err
🧰 Tools
🪛 Ruff (0.8.2)

29-31: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


1-3: 🛠️ Refactor suggestion

Missing decorator and type imports

The Protocol enum is missing the @unique decorator and required type imports that were identified in previous reviews.

Update the imports to include the unique decorator and Union type:

-from enum import Enum
+from enum import Enum, unique
+from typing import Union

41-45: ⚠️ Potential issue

Fix infinite recursion risk in equality method

The equality method implementation risks infinite recursion when comparing with another Protocol instance.

Improve the implementation with type hints and fix the recursive self.name access:

-    def __eq__(self, other):
+    def __eq__(self, other: Union[str, 'Protocol', None]) -> bool:
+        if other is None:
+            return False
         if isinstance(other, str):
-            return other.lower() == self.name.lower()
+            return other.lower() == super().name.lower()
 
-        return self.name == other.name
+        if isinstance(other, Protocol):
+            return super().name == other.value.segment
+        return False
ct-app/core/node.py (2)

477-487: 🛠️ Refactor suggestion

Add error handling for session creation

The open_session method lacks error handling for the session creation process.

Add error handling to gracefully handle session creation failures:

    async def open_session(self, relayer: str):
-        if session := await NodeHelper.open_session(
-            self.address,
-            self.api,
-            relayer,
-            self.p2p_endpoint,
-        ):
-            self.session_management[relayer] = SessionToSocket(
-                session, self.p2p_endpoint
-            )
+        try:
+            if session := await NodeHelper.open_session(
+                self.address,
+                self.api,
+                relayer,
+                self.p2p_endpoint,
+            ):
+                self.session_management[relayer] = SessionToSocket(
+                    session, self.p2p_endpoint
+                )
+            else:
+                logger.warning(
+                    f"Failed to open session for {relayer}",
+                    self.log_base_params
+                )
+        except Exception as e:
+            logger.error(
+                f"Error creating session for {relayer}",
+                {"error": str(e), **self.log_base_params}
+            )

423-441: 🛠️ Refactor suggestion

Improve session cleanup to prevent race conditions

The current implementation modifies the dictionary while iterating through it and lacks error handling for socket operations.

Refactor to safely remove items and handle errors:

    @master(flagguard, formalin, connectguard)
    async def close_sessions(self):
        active_sessions = await self.api.get_sessions(Protocol.UDP)

-        to_remove = [
-            peer_id
-            for peer_id, s in self.session_management.items()
-            if s.session not in active_sessions
-        ]
+        to_remove = []
+        for peer_id, s in self.session_management.items():
+            if s.session not in active_sessions:
+                try:
+                    # Store session to close it later
+                    to_remove.append((peer_id, s.session))
+                except Exception as e:
+                    logger.error(
+                        f"Error preparing to close session for {peer_id}",
+                        {"error": str(e), **self.log_base_params}
+                    )

-        for peer_id in to_remove:
+        for peer_id, session in to_remove:
+            # Remove from dictionary before async operation
+            if peer_id in self.session_management:
+                session_obj = self.session_management.pop(peer_id)
+                AsyncLoop.add(
+                    NodeHelper.close_session,
+                    self.address,
+                    self.api,
+                    peer_id,
+                    session_obj,
+                    publish_to_task_set=False,
+                )
-            AsyncLoop.add(
-                NodeHelper.close_session,
-                self.address,
-                self.api,
-                peer_id,
-                self.session_management.pop(peer_id),
-                publish_to_task_set=False,
-            )
ct-app/core/components/session_to_socket.py (1)

22-33: 🛠️ Refactor suggestion

Fix exception chaining in constructor

The constructor should use exception chaining to preserve the original error context.

Add proper exception chaining:

        try:
            self.socket, self.conn = self.create_socket(timeout)
        except (socket.error, ValueError) as e:
-            raise ValueError(f"Error while creating socket: {e}")
+            raise ValueError(f"Error while creating socket: {e}") from e
🧰 Tools
🪛 Ruff (0.8.2)

32-32: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🧹 Nitpick comments (2)
ct-app/core/components/session_to_socket.py (2)

62-77: Remove unused connection variable

The conn variable is initialized but never used.

Simplify the code by removing the unused variable:

    def create_socket(self, timeout: Optional[int]):
        if self.session.protocol == Protocol.UDP:
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        else:
            raise ValueError(f"Invalid protocol: {self.session.protocol}")

        s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, BUF_SIZE)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, BUF_SIZE)

        if timeout is not None:
            s.settimeout(timeout)

-        conn = None
-
-        return s, conn
+        return s, None

87-101: Update docstring to match implementation

The docstring doesn't accurately describe the method's return value.

Update the docstring to match the implementation:

    def receive(self, size: int) -> tuple[Optional[str], int, Optional[int]]:
        """
-        Receives data from the peer. In case off multiple message in the same packet, which should
-        not happen, they are already split and returned as a list.
+        Receives data from the peer.
+        
+        Args:
+            size (int): Maximum size of data to receive
+            
+        Returns:
+            tuple: (decoded_data, data_size, timestamp) where:
+                - decoded_data (Optional[str]): Decoded data or None on timeout
+                - data_size (int): Size of received data in bytes
+                - timestamp (Optional[int]): Reception timestamp in milliseconds or None on timeout
+                
+        Raises:
+            ValueError: If protocol is not UDP
         """
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a361bbf and cb1b160.

📒 Files selected for processing (11)
  • ct-app/.configs/core_prod_config.yaml (4 hunks)
  • ct-app/.configs/core_staging_config.yaml (6 hunks)
  • ct-app/core/api/hoprd_api.py (3 hunks)
  • ct-app/core/api/protocol.py (1 hunks)
  • ct-app/core/components/messages/message_format.py (3 hunks)
  • ct-app/core/components/peer.py (1 hunks)
  • ct-app/core/components/session_to_socket.py (1 hunks)
  • ct-app/core/core.py (4 hunks)
  • ct-app/core/node.py (8 hunks)
  • ct-app/scripts/test_sessions.py (1 hunks)
  • ct-app/test/components/test_messages.py (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (7)
  • ct-app/.configs/core_prod_config.yaml
  • ct-app/core/components/peer.py
  • ct-app/scripts/test_sessions.py
  • ct-app/core/core.py
  • ct-app/core/components/messages/message_format.py
  • ct-app/test/components/test_messages.py
  • ct-app/core/api/hoprd_api.py
🧰 Additional context used
🧠 Learnings (1)
ct-app/core/node.py (2)
Learnt from: jeandemeusy
PR: hoprnet/ct-research#583
File: ct-app/core/node.py:472-476
Timestamp: 2024-11-19T16:47:51.812Z
Learning: The `create_socket` function in `ct-app/core/node.py` is intentionally synchronous and should not be converted to an asynchronous function.
Learnt from: jeandemeusy
PR: hoprnet/ct-research#583
File: ct-app/core/node.py:416-416
Timestamp: 2025-04-24T13:17:55.669Z
Learning: In the `SessionToSocket` class, the `receive()` method already decodes the bytes data and returns a string, so calling `.decode()` on the return value would be redundant and cause an error.
🪛 Ruff (0.8.2)
ct-app/core/components/session_to_socket.py

32-32: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

ct-app/core/api/protocol.py

29-31: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🔇 Additional comments (2)
ct-app/core/node.py (2)

488-494: Good dynamic task discovery implementation

The tasks property dynamically discovers decorated methods, making it easier to maintain and extend the task list without manual updates.

This is a clean implementation that eliminates the need to manually update the tasks list when adding new methods.


495-520: Well-designed p2p endpoint resolution with fallback

The p2p_endpoint property effectively uses pattern matching to derive endpoints with a sensible fallback mechanism.

This implementation gracefully handles different URL patterns and provides clear logging when falling back to the default URL.

Comment thread ct-app/.configs/core_staging_config.yaml
Comment thread ct-app/.configs/core_staging_config.yaml
Comment thread ct-app/core/api/protocol.py
Comment thread ct-app/core/node.py
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (1)
ct-app/core/components/session_to_socket.py (1)

102-129: 💡 Verification agent

🧩 Analysis chain

Document the magic number and resolve TODO comment in send_and_receive method.

The method uses a magic number (476) for calculating message counts without explanation, and there's an unresolved TODO comment.

     async def send_and_receive(self, message: MessageFormat) -> float:
-        # TODO: maybe set the timestamp here ?
+        """
+        Sends a message and waits for a response, tracking metrics for the exchange.
+        
+        Args:
+            message: The formatted message to send
+            
+        Returns:
+            float: The ratio of received bytes to sent bytes, or 0 if no response
+        """
+        # Message timestamp is already set in MessageFormat

         sent_size = self.send(message.bytes())
         recv_message, recv_size, timestamp = self.receive(sent_size)

         if recv_message is None:
             return 0

         try:
             message = MessageFormat.parse(recv_message)
         except ValueError:
             return 0

         rtt = (timestamp - message.timestamp) / 1000

         # convert to number of messages instead of bytes
+        # 476 bytes is the expected size of a typical message in this protocol
         sent_count = sent_size / 476
         recv_count = recv_size / 476

Consider extracting the magic number to a named constant at the top of the file:

 BUF_SIZE = 8192
+# Average size of a formatted message in bytes
+MESSAGE_SIZE = 476

Then updating the usage:

         # convert to number of messages instead of bytes
-        sent_count = sent_size / 476
-        recv_count = recv_size / 476
+        sent_count = sent_size / MESSAGE_SIZE
+        recv_count = recv_size / MESSAGE_SIZE

🏁 Script executed:

#!/bin/bash
# Verify if the message size of 476 bytes is documented elsewhere in the codebase
echo "Searching for references to message size in bytes..."
rg -i "message.*size.*476|476.*message.*size" --type py

Length of output: 172


🏁 Script executed:

#!/bin/bash
# Locate the definition of send_and_receive to confirm the file path
echo "Locating send_and_receive definition..."
rg -n "async def send_and_receive" --type py

# Search for any other uses of the literal 476 across the Python codebase
echo "Searching for occurrences of '476' in Python files..."
rg -n "476" --type py

Length of output: 569


🏁 Script executed:

#!/bin/bash
# Locate the MessageFormat class to inspect timestamp handling
echo "Locating MessageFormat definition..."
rg -n "class MessageFormat" --type py

# Show context around MessageFormat methods to find timestamp assignment
echo "Inspecting MessageFormat for timestamp field usage..."
rg -n "timestamp" -C3 --type py

Length of output: 7515


Document magic number and remove outdated TODO in send_and_receive

MessageFormat already initializes its own timestamp, so the inline TODO can be cleared. Extract the “476” magic number into a named constant and update usages accordingly.

• File: ct-app/core/components/session_to_socket.py
– At the top of the module, add:
python # Average size of a formatted message in bytes MESSAGE_SIZE = 476
– In send_and_receive, replace the TODO with a docstring and switch to the new constant:

--- a/ct-app/core/components/session_to_socket.py
+++ b/ct-app/core/components/session_to_socket.py
@@ -1,3 +1,6 @@
+# Average size of a formatted message in bytes
+MESSAGE_SIZE = 476
+
 
     async def send_and_receive(self, message: MessageFormat) -> float:
-        # TODO: maybe set the timestamp here ?
+        """
+        Send a formatted message (timestamp set in constructor), await response, and track metrics.
+
+        Returns the ratio of received bytes to sent bytes, or 0 if no valid response.
+        """
 
         sent_size = self.send(message.bytes())
         recv_message, recv_size, timestamp = self.receive(sent_size)
@@ -116,8 +119,8 @@
         rtt = (timestamp - message.timestamp) / 1000
 
-        # convert to number of messages instead of bytes
-        sent_count = sent_size / 476
-        recv_count = recv_size / 476
+        # convert bytes to message count for Prometheus metrics
+        sent_count = sent_size / MESSAGE_SIZE
+        recv_count = recv_size / MESSAGE_SIZE
 
         MESSAGES_STATS.labels("sent", message.sender, message.relayer).inc(sent_count)
         …
🧹 Nitpick comments (5)
ct-app/core/components/session_to_socket.py (5)

29-32: Use explicit exception chaining.

When re-raising exceptions, use explicit exception chaining with from e to preserve the original traceback information, which aids in debugging.

        try:
            self.socket, self.conn = self.create_socket(timeout)
        except (socket.error, ValueError) as e:
-            raise ValueError(f"Error while creating socket: {e}")
+            raise ValueError(f"Error while creating socket: {e}") from e
🧰 Tools
🪛 Ruff (0.8.2)

32-32: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


62-77: Improve socket creation method documentation and remove unused variable.

The method creates and configures a UDP socket but initializes an unused conn variable and has incomplete documentation.

-    def create_socket(self, timeout: Optional[int]):
+    def create_socket(self, timeout: Optional[int]) -> tuple[socket.socket, None]:
+        """
+        Creates and configures a UDP socket with the specified buffer sizes and timeout.
+        
+        Args:
+            timeout: Optional timeout in seconds for socket operations
+            
+        Returns:
+            A tuple of (socket, None) where socket is the configured UDP socket
+            
+        Raises:
+            ValueError: If protocol is not UDP
+        """
         if self.session.protocol == Protocol.UDP:
             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
         else:
             raise ValueError(f"Invalid protocol: {self.session.protocol}")

         s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, BUF_SIZE)
         s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, BUF_SIZE)

         if timeout is not None:
             s.settimeout(timeout)

-        conn = None
-
-        return s, conn
+        return s, None

24-24: Fix timeout parameter type annotation.

The timeout parameter is annotated as Optional[int] but the default value is a float (0.05). This creates an inconsistency between the type hint and the actual value.

-        self, session: Session, connect_address: str, timeout: Optional[int] = 0.05
+        self, session: Session, connect_address: str, timeout: Optional[float] = 0.05

62-62: Update timeout parameter type in create_socket method.

For consistency with the constructor, update the timeout parameter type.

-    def create_socket(self, timeout: Optional[int]):
+    def create_socket(self, timeout: Optional[float]):

11-20: Add description to MESSAGES_STATS metric.

The MESSAGES_STATS Gauge metric has an empty description string, which makes its purpose unclear. Add a meaningful description.

 BUF_SIZE = 8192

 MESSAGES_DELAYS = Histogram(
     "ct_messages_delays",
     "Messages delays",
     ["sender", "relayer"],
     buckets=[0.025, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1, 2.5],
 )
-MESSAGES_STATS = Gauge("ct_messages_stats", "", ["type", "sender", "relayer"])
+MESSAGES_STATS = Gauge("ct_messages_stats", "Message statistics by type, sender and relayer", ["type", "sender", "relayer"])
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cb1b160 and c03550f.

📒 Files selected for processing (1)
  • ct-app/core/components/session_to_socket.py (1 hunks)
🧰 Additional context used
🪛 Ruff (0.8.2)
ct-app/core/components/session_to_socket.py

32-32: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (2)
ct-app/core/node.py (2)

477-487: 🛠️ Refactor suggestion

Add error handling for session opening.

The open_session method doesn't handle potential exceptions from the session creation or socket connection process.

async def open_session(self, relayer: str):
-    if session := await NodeHelper.open_session(
-        self.address,
-        self.api,
-        relayer,
-        self.p2p_endpoint,
-    ):
-        self.session_management[relayer] = SessionToSocket(
-            session, self.p2p_endpoint
-        )
+    try:
+        if session := await NodeHelper.open_session(
+            self.address,
+            self.api,
+            relayer,
+            self.p2p_endpoint,
+        ):
+            try:
+                self.session_management[relayer] = SessionToSocket(
+                    session, self.p2p_endpoint
+                )
+                logger.info(
+                    f"Opened session to peer {relayer}",
+                    {"peer_id": relayer, **self.log_base_params},
+                )
+            except Exception as e:
+                logger.error(
+                    f"Error creating socket for peer {relayer}",
+                    {"error": str(e), **self.log_base_params},
+                )
+    except Exception as e:
+        logger.error(
+            f"Error opening session for peer {relayer}",
+            {"error": str(e), **self.log_base_params},
+        )

398-421: 🛠️ Refactor suggestion

Add error handling for session message sending.

The observe_message_queue method lacks error handling for the message sending operation. If the socket operation fails, it could crash the loop.

    message.sender = self.address.hopr
-    for _ in range(self.params.sessions.batchSize):
-        AsyncLoop.add(
-            self.session_management[message.relayer].send_and_receive,
-            message,
-            publish_to_task_set=False,
-        )
-        message.increase_inner_index()
+    try:
+        for _ in range(self.params.sessions.batchSize):
+            AsyncLoop.add(
+                self.session_management[message.relayer].send_and_receive,
+                message,
+                publish_to_task_set=False,
+            )
+            message.increase_inner_index()
+    except Exception as e:
+        logger.error(
+            f"Error sending message to {message.relayer}",
+            {"error": str(e), **self.log_base_params},
+        )
🧹 Nitpick comments (11)
ct-app/.configs/core_prod_config.yaml (2)

118-121: Document the rationale for the session parameters.

The new sessions section introduces several parameters without explaining their purpose or constraints:

  • Why is packetSize: 462 chosen and why will it change to 762?
  • What's the significance of aggregatedPackets: 10?
  • How was batchSize: 5 determined?

Add documentation comments to explain these values:

sessions:
-  packetSize: 462 # TODO 3.0: change to 762
+  packetSize: 462 # Size in bytes for each packet (TODO 3.0: change to 762 due to improved MTU handling)
-  aggregatedPackets: 10
+  aggregatedPackets: 10 # Number of packets to aggregate for receive buffer sizing
-  batchSize: 5
+  batchSize: 5 # Number of message copies to send per request for reliability

116-116: Fix trailing whitespace.

The line contains trailing spaces that should be removed.

-# 
+#
🧰 Tools
🪛 YAMLlint (1.35.1)

[error] 116-116: trailing spaces

(trailing-spaces)

ct-app/.configs/core_staging_config.yaml (1)

117-117: Fix trailing whitespace.

The line contains trailing spaces that should be removed.

-# 
+#
🧰 Tools
🪛 YAMLlint (1.35.1)

[error] 117-117: trailing spaces

(trailing-spaces)

ct-app/core/components/messages/message_format.py (1)

76-84: Fix typo in error message and enhance bytes method.

There's a typo in the error message and the method could be more efficient.

if len(message_as_bytes) > self.size:
    raise ValueError(
-        f"Encoded message is exceeds specified size ({len(message_as_bytes)} > {self.size})"
+        f"Encoded message exceeds specified size ({len(message_as_bytes)} > {self.size})"
    )
return message_as_bytes + b"\0" * (self.size - len(message_as_bytes))
ct-app/core/node.py (2)

423-441: Improve session cleanup to avoid dictionary modification during iteration.

The close_sessions method creates a list of peer IDs to remove and then modifies the dictionary while iterating through this list. While this approach avoids direct modification during iteration, it could be improved with better error handling.

async def close_sessions(self):
    active_sessions = await self.api.get_sessions(Protocol.UDP)

-    to_remove = [
-        peer_id
-        for peer_id, s in self.session_management.items()
-        if s.session not in active_sessions
-    ]
+    to_remove = []
+    for peer_id, s in self.session_management.items():
+        if s.session not in active_sessions:
+            try:
+                s.socket.close()  # Close the socket before removing from dictionary
+                to_remove.append((peer_id, s.session))
+            except Exception as e:
+                logger.error(
+                    f"Error closing socket for peer {peer_id}",
+                    {"error": str(e), **self.log_base_params},
+                )
+                # Still add to removal list even if socket close fails
+                to_remove.append((peer_id, s.session))

-    for peer_id in to_remove:
+    for peer_id, session in to_remove:
+        # Remove from dictionary before async operation
+        self.session_management.pop(peer_id, None)
        AsyncLoop.add(
            NodeHelper.close_session,
            self.address,
            self.api,
            peer_id,
-            self.session_management.pop(peer_id),
+            session,
            publish_to_task_set=False,
        )

410-410: Fix unused loop variable warning.

Ruff static analysis shows that the relayer variable in the observe_message_queue method isn't used within the method.

-    for checklist in [peers, channels, self.session_management]:
+    for checklist in [peers, channels, list(self.session_management.keys())]:
        if message.relayer not in checklist:
            return
ct-app/core/components/session_to_socket.py (5)

24-25: Fix parameter type annotation for timeout.

The parameter timeout is annotated as Optional[int] but has a default value of 0.05 which is a float. This type mismatch could cause typing errors or confuse static analyzers.

    def __init__(
-        self, session: Session, connect_address: str, timeout: Optional[int] = 0.05
+        self, session: Session, connect_address: str, timeout: Optional[float] = 0.05
    ):

29-32: Improve exception handling with explicit exception chaining.

Use explicit exception chaining to preserve the original error context, which helps with debugging by maintaining the full traceback.

        try:
            self.socket, self.conn = self.create_socket(timeout)
        except (socket.error, ValueError) as e:
-            raise ValueError(f"Error while creating socket: {e}")
+            raise ValueError(f"Error while creating socket: {e}") from e
🧰 Tools
🪛 Ruff (0.8.2)

32-32: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


55-60: Add return type annotation to the address property.

For consistency with other methods and to improve type checking, add a return type annotation to the address property. Also, remove the unnecessary blank line.

    @property
-    def address(self):
+    def address(self) -> tuple[str, int]:
        """
        Returns the socket address tuple.
        """
-
        return (self.connect_address, self.session.port)

62-76: Improve socket creation method for clarity and type safety.

The create_socket method lacks a return type annotation and creates an unused conn variable. It should also have a more descriptive docstring about UDP-only support.

-    def create_socket(self, timeout: Optional[int]):
+    def create_socket(self, timeout: Optional[float]) -> tuple[socket.socket, None]:
+        """
+        Creates a UDP socket with the specified timeout.
+        
+        Args:
+            timeout: Socket timeout in seconds, or None for no timeout
+        
+        Returns:
+            tuple: (socket, None) tuple where socket is the UDP socket
+        
+        Raises:
+            ValueError: If protocol is not UDP
+        """
         if self.session.protocol == Protocol.UDP:
             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
         else:
             raise ValueError(f"Invalid protocol: {self.session.protocol}")
 
         s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, BUF_SIZE)
         s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, BUF_SIZE)
 
         if timeout is not None:
             s.settimeout(timeout)
 
-        conn = None
-
-        return s, conn
+        return s, None

103-103: Address the TODO comment about timestamp setting.

There's an unresolved TODO comment about possibly setting the timestamp in the send_and_receive method. This should be addressed to ensure accurate timing measurements.

Would you like help implementing a solution for the timestamp setting? One approach would be to set the timestamp just before sending the message to ensure precise measurement of the round-trip time.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c03550f and 5b87b2f.

📒 Files selected for processing (6)
  • ct-app/.configs/core_prod_config.yaml (4 hunks)
  • ct-app/.configs/core_staging_config.yaml (6 hunks)
  • ct-app/core/components/messages/message_format.py (4 hunks)
  • ct-app/core/components/peer.py (1 hunks)
  • ct-app/core/components/session_to_socket.py (1 hunks)
  • ct-app/core/node.py (8 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • ct-app/core/components/peer.py
🧰 Additional context used
🧠 Learnings (1)
ct-app/core/node.py (2)
Learnt from: jeandemeusy
PR: hoprnet/ct-research#583
File: ct-app/core/node.py:472-476
Timestamp: 2024-11-19T16:47:51.812Z
Learning: The `create_socket` function in `ct-app/core/node.py` is intentionally synchronous and should not be converted to an asynchronous function.
Learnt from: jeandemeusy
PR: hoprnet/ct-research#583
File: ct-app/core/node.py:416-416
Timestamp: 2025-04-24T13:17:55.669Z
Learning: In the `SessionToSocket` class, the `receive()` method already decodes the bytes data and returns a string, so calling `.decode()` on the return value would be redundant and cause an error.
🪛 YAMLlint (1.35.1)
ct-app/.configs/core_staging_config.yaml

[error] 117-117: trailing spaces

(trailing-spaces)

ct-app/.configs/core_prod_config.yaml

[error] 116-116: trailing spaces

(trailing-spaces)

🪛 Ruff (0.8.2)
ct-app/core/components/session_to_socket.py

32-32: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🔇 Additional comments (30)
ct-app/.configs/core_prod_config.yaml (3)

5-5: LGTM: Environment identifier added.

Adding an environment identifier at the top level is a good practice for configuration files, as it allows the application to determine the environment it's running in.


15-15: LGTM: Session opening flag added.

The new openSessions: 60 flag provides the frequency for session opening operations, aligning with the session management approach introduced in this PR.


40-40: LGTM: Session closing flag added.

The closeSessions: 30 flag specifies how often to run the session cleanup, which helps manage inactive sessions and prevent resource leaks.

ct-app/.configs/core_staging_config.yaml (5)

5-5: LGTM: Environment identifier added.

Adding an environment identifier at the top level is a good practice for configuration files, matching the approach in the production config.


15-15: LGTM: Session opening flag added.

The new openSessions: 60 flag matches the production configuration, ensuring consistent behavior across environments.


40-40: LGTM: Session closing flag added.

The closeSessions: 30 flag matches the production configuration, ensuring consistent session cleanup timing.


56-56: LGTM: APR value updated.

The change to the apr value in the economic model is a deliberate adjustment for the staging environment.


106-106: LGTM: Message multiplier reduced.

The reduction of messageMultiplier to 1 aligns with the comment that explains the privacy/performance tradeoff.

ct-app/core/components/messages/message_format.py (6)

6-14: LGTM: Updated parameter list with new packet_size and sender fields.

The parameter list has been updated to include packet_size for session-based message sizing and an optional sender field for tracking message origins.


18-30: LGTM: Constructor updated with new parameters.

The constructor now properly handles the new packet_size parameter and optional sender parameter with appropriate type conversion.


40-43: LGTM: Size property added.

The new size property calculates the total message size as the product of packet_size and multiplier, providing a clean interface for accessing this derived value.


57-59: LGTM: Empty string validation added.

The added check for empty input strings with a clear error message improves error handling in the parse method.


65-65: LGTM: Improved error formatting.

The backticks around the input string in the error message improve readability when debugging format issues.


85-93: LGTM: Added equality and representation methods.

The added __eq__ and __repr__ methods enhance the class's usability by enabling proper comparison and debugging output.

ct-app/core/node.py (16)

2-2: LGTM: Added asyncio import.

The asyncio import is necessary for the new asyncio.sleep call in the observe_message_queue method.


6-6: LGTM: Added Prometheus import.

The Gauge import from prometheus_client is used for monitoring metrics.


10-10: LGTM: Added PatternMatcher import.

The PatternMatcher import is needed for the new p2p_endpoint property.


12-12: LGTM: Updated API import to include Protocol.

The import now includes Protocol for UDP protocol handling in session management.


18-18: LGTM: Added SessionToSocket import.

The SessionToSocket import is necessary for the new session management functionality.


58-58: LGTM: Added session management dictionary.

The new session_management dictionary property will store session-to-socket mappings for UDP communication.


61-61: LGTM: Added running flag.

The running flag will help manage the lifecycle of the node.


189-189: LGTM: Consistent use of NodeHelper.close_channel.

The code now consistently uses NodeHelper.close_channel across different channel closing methods, which improves maintainability.

Also applies to: 215-215, 262-262


193-193: LGTM: Added closure reason parameters.

Adding specific closure reason parameters helps with tracking and logging the reasons for channel closures.

Also applies to: 219-219, 266-266


235-240: LGTM: Improved channel lookup.

Creating a mapping from addresses to channels first and then iterating over it is more efficient than repeatedly searching through the channels list.

🧰 Tools
🪛 Ruff (0.8.2)

240-240: Use peer_history.get(address) instead of peer_history.get(address, None)

Replace peer_history.get(address, None) with peer_history.get(address)

(SIM910)


251-251: LGTM: Direct channel appending.

Directly appending the channel object to channels_to_close simplifies the code compared to storing IDs and looking them up again.


442-458: LGTM: Added method to close all sessions on startup.

The close_all_sessions method provides a clean way to reset session state during node initialization, which is a good practice for ensuring a consistent state.


459-476: LGTM: Added method to open sessions for allowed addresses.

The open_sessions method efficiently determines which peers need sessions and opens them, using set operations to find the intersection of peers with channels and allowed peers without sessions.


488-494: LGTM: Dynamic tasks discovery.

The new tasks property dynamically discovers decorated methods, which simplifies maintenance as new methods are added.


495-519: LGTM: Added p2p_endpoint property.

The p2p_endpoint property cleverly derives the P2P endpoint URL from the node URL using regex patterns, with fallback to the original URL if no pattern matches.


403-403: LGTM: Added await for asyncio.sleep.

The await keyword is correctly used with asyncio.sleep to ensure non-blocking behavior when no channels are found.

Comment thread ct-app/.configs/core_staging_config.yaml
Comment thread ct-app/core/components/session_to_socket.py Outdated
Comment thread ct-app/core/components/session_to_socket.py
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (5)
ct-app/.configs/core_staging_config.yaml (5)

15-15: Introduce openSessions flag for session management
The new flags.core.openSessions: 60 value controls how frequently sessions are opened. Ensure the code uses this interval correctly and document its units (seconds) alongside related flags to avoid confusion.


56-56: Double legacy APR—document rationale
The economicModel.legacy.apr was increased from 0.000001 to 0.000002, effectively doubling the rate. Consider adding a comment or updating documentation to explain the business reason for this change.


106-106: Reduce messageMultiplier for staging—align or document
Lowering peer.messageMultiplier from 5 to 1 will drastically change batching behavior in staging. Either align with the production value or add a comment explaining why staging uses a smaller batch size.


116-118: Remove trailing whitespace from YAML separators
YAML lint flagged trailing spaces on the separator lines. Cleaning up these spaces will prevent spurious warnings:

-# <spaces>
+# 
🧰 Tools
🪛 YAMLlint (1.35.1)

[error] 117-117: trailing spaces

(trailing-spaces)


119-122: Introduce sessions section—address TODO and justify parameters
A new sessions block defines packet-size, aggregation, and batch-size defaults:

  • Packet size has a TODO (change to 762 in v3.0). Please plan or remove the TODO if it’s no longer needed.
  • aggregatedPackets: 2 and batchSize: 5 differ from production settings. Align them or document why staging uses these lower values.
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5b87b2f and eb4a786.

📒 Files selected for processing (1)
  • ct-app/.configs/core_staging_config.yaml (6 hunks)
🧰 Additional context used
🪛 YAMLlint (1.35.1)
ct-app/.configs/core_staging_config.yaml

[error] 117-117: trailing spaces

(trailing-spaces)

🔇 Additional comments (2)
ct-app/.configs/core_staging_config.yaml (2)

5-5: Add top-level environment designation
Introducing the environment: staging key provides clarity for environment-specific behavior. Please verify that the application or deployment tooling reads and acts on this value (e.g., for conditional flag defaults or logging).


40-40: Add closeSessions flag to govern session teardown
By setting flags.node.closeSessions: 30, sessions may be closed before the next open interval elapses. Please confirm that the lifecycle logic (open → close → reopen) aligns with your intended session duration and edge-case handling.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (5)
ct-app/core/api/protocol.py (5)

25-32: 🛠️ Refactor suggestion

Improve error handling in fromString method

The error handling should use exception chaining with from err to preserve the original exception context.

    @classmethod
-    def fromString(cls, protocol: str):
+    def fromString(cls, protocol: str) -> 'Protocol':
         try:
             return getattr(cls, protocol.upper())
-        except AttributeError:
+        except AttributeError as err:
             raise ValueError(
                 f"Invalid protocol: {protocol}. Valid values are: {[p.name for p in cls]}"
-            )
+            ) from err
🧰 Tools
🪛 Ruff (0.8.2)

30-32: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


21-23: 🛠️ Refactor suggestion

Add @unique decorator to Protocol enum

Adding the @unique decorator ensures that all enum values are unique, which is important for protocol definitions.

+@unique
 class Protocol(Enum):
     TCP = TCPProtocol()
     UDP = UDPProtocol()

1-1: 🛠️ Refactor suggestion

Add missing imports for type safety and enum uniqueness

The file is missing the @unique decorator for the Protocol enum and lacks necessary type hint imports.

-from enum import Enum
+from enum import Enum, unique
+from typing import Union

34-37: ⚠️ Potential issue

Add missing name property to Protocol enum

You need to add a name property to convert the enum name to lowercase, which is referenced in your __eq__ method but not defined. The current implementation would cause infinite recursion if accessed.

    @property
    def segment(self):
        return self.value.segment
+
+    @property
+    def name(self):
+        return super().name.lower()

46-50: ⚠️ Potential issue

Fix potential infinite recursion in equality method

The current equality method risks infinite recursion and lacks type hints. The self.name access needs to be fixed to use super().name.

-    def __eq__(self, other):
+    def __eq__(self, other: Union[str, 'Protocol', None]) -> bool:
+        if other is None:
+            return False
         if isinstance(other, str):
-            return other.lower() == self.name.lower()
+            return other.lower() == super().name.lower()
 
-        return self.name == other.name
+        return super().name.lower() == other.name.lower()
🧹 Nitpick comments (5)
ct-app/core/api/protocol.py (1)

4-8: Add documentation to ProtocolTemplate class

The ProtocolTemplate class lacks documentation explaining its purpose and parameters.

 class ProtocolTemplate:
+    """Base class for protocol types.
+    
+    Args:
+        retransmit (bool): Whether the protocol supports message retransmission
+        segment (bool): Whether the protocol supports message segmentation
+        no_delay (bool): Whether the protocol disables Nagle's algorithm
+    """
     def __init__(self, retransmit: bool, segment: bool, no_delay: bool):
         self.retransmit = retransmit
         self.segment = segment
         self.no_delay = no_delay
ct-app/core/api/request_objects.py (4)

92-107: Fix potential performance issue in as_array method

The as_array method in SessionCapabilitiesBody iterates through all instance variables and checks if they're in self.keys, which is inefficient. Consider iterating through self.keys directly.

    @property
    def as_array(self) -> list:
-        return [
-            self.keys[var] for var in vars(self) if var in self.keys and vars(self)[var]
-        ]
+        return [
+            self.keys[key] for key in self.keys if getattr(self, key, False)
+        ]

109-116: Add type annotations for return values

The class methods in SessionPathBodyRelayers don't have return type annotations. Consider adding them for better type safety.

 class SessionPathBodyRelayers(ApiRequestObject):
     keys = {
         "relayers": "IntermediatePath",
     }

-    def __init__(self, relayers: list[str]):
+    def __init__(self, relayers: list[str]) -> None:
         super().__init__(vars())

118-128: Consider validating hops value in post_init

In SessionPathBodyHops, you're converting the hops value to an integer but not validating it. Consider adding validation to ensure it's a non-negative value.

    def post_init(self):
        self.hops = int(self.hops)
+        if self.hops < 0:
+            raise ValueError(f"Number of hops must be non-negative, got {self.hops}")

130-137: Add post_init validation for service value

Similar to the SessionPathBodyHops class, the SessionTargetBody class should validate that the service value is appropriate (e.g., non-negative).

    def __init__(self, service: int = 0):
        super().__init__(vars())
+        
+    def post_init(self):
+        self.service = int(self.service)
+        if self.service < 0:
+            raise ValueError(f"Service ID must be non-negative, got {self.service}")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between eb4a786 and 1fb7d5c.

📒 Files selected for processing (4)
  • ct-app/core/api/hoprd_api.py (3 hunks)
  • ct-app/core/api/protocol.py (1 hunks)
  • ct-app/core/api/request_objects.py (1 hunks)
  • ct-app/core/components/messages/message_format.py (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • ct-app/core/components/messages/message_format.py
  • ct-app/core/api/hoprd_api.py
🧰 Additional context used
🪛 Ruff (0.8.2)
ct-app/core/api/protocol.py

30-32: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🔇 Additional comments (2)
ct-app/core/api/request_objects.py (2)

65-70: LGTM: GetPeersBody class implementation

The implementation of GetPeersBody with the quality parameter is correct and follows the established pattern in the codebase.


72-90:

Details

❓ Verification inconclusive

Verify session parameters match API requirements

The CreateSessionBody class has been significantly updated to support session creation with new parameters. Ensure these parameter names and types align with the HOPRd API documentation for session creation.


🏁 Script executed:

#!/bin/bash
# Check how these parameters are used in the HoprdAPI implementation
rg -A 10 "post_session" --type py

Length of output: 2527


🏁 Script executed:

#!/bin/bash
# Locate usages of CreateSessionBody to see how the request is constructed
rg -n "CreateSessionBody" --type py

Length of output: 191


🏁 Script executed:

#!/bin/bash
# Show lines around CreateSessionBody usage in hoprd_api.py
sed -n '260,310p' ct-app/core/api/hoprd_api.py

Length of output: 1963


🏁 Script executed:

#!/bin/bash
# Inspect related request object definitions
rg -A10 "class SessionCapabilitiesBody" ct-app/core/api/request_objects.py
rg -A10 "class SessionTargetBody" ct-app/core/api/request_objects.py
rg -A10 "class SessionPathBodyRelayers" ct-app/core/api/request_objects.py

Length of output: 953


Verify session creation request body fields align with HOPRd API spec

Ensure that the JSON keys and types emitted by CreateSessionBody match the HOPRd API’s session‐creation endpoint. In particular:

• ct-app/core/api/request_objects.py → class CreateSessionBody
– keys mapping:
• “capabilities” → array of booleans
• “destination” → string
• “listen_host” → “listenHost” (string)
• “path” → nested dict from SessionPathBodyRelayers.as_dict
• “target” → nested dict from SessionTargetBody.as_dict

• ct-app/core/api/hoprd_api.py → data = CreateSessionBody(...)
– confirm that the order and types of arguments passed (capabilities_body.as_array, destination, listen_host, path_body.as_dict, target_body.as_dict) produce a payload satisfying the published API schema.

Please cross-reference the HOPRd API documentation for session creation and adjust any mismatches in key names or value formats.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (1)
ct-app/core/components/session_to_socket.py (1)

102-126: ⚠️ Potential issue

Fix async method without await and prevent division by zero.

The send_and_receive method is marked as async but doesn't use any await statements. Additionally, there's a risk of division by zero when returning recv_size / sent_size if sent_size is 0, and the variable naming creates confusion by reusing the message name.

-    async def send_and_receive(self, message: MessageFormat) -> float:
+    def send_and_receive(self, message: MessageFormat) -> float:
+        """
+        Sends a message to the peer and waits for a response.
+        
+        Args:
+            message: Formatted message to send
+            
+        Returns:
+            float: Ratio of received size to sent size (0 for failures or timeouts)
+        """
         message.update_timestamp()

         sent_size = self.send(message.bytes())
         MESSAGES_STATS.labels("sent", message.sender, message.relayer).inc(
             sent_size / message.packet_size
         )

         recv_message, recv_size, timestamp = self.receive(sent_size)
         MESSAGES_STATS.labels("relayed", message.sender, message.relayer).inc(
             recv_size / message.packet_size
         )

         if recv_message is None:
             return 0

         try:
-            message = MessageFormat.parse(recv_message)
+            received_msg = MessageFormat.parse(recv_message)
         except ValueError:
             return 0
         else:
-            rtt = (timestamp - message.timestamp) / 1000
-            MESSAGES_DELAYS.labels(message.sender, message.relayer).observe(rtt)
-            return recv_size / sent_size
+            rtt = (timestamp - received_msg.timestamp) / 1000
+            MESSAGES_DELAYS.labels(received_msg.sender, received_msg.relayer).observe(rtt)
+            return recv_size / sent_size if sent_size > 0 else 0
♻️ Duplicate comments (3)
ct-app/core/components/session_to_socket.py (3)

62-76: Remove unused variable in create_socket method.

The conn variable is initialized but never used, and is always returned as None. This can be confusing to code readers.

    def create_socket(self, timeout: Optional[int]):
        if self.session.protocol == Protocol.UDP:
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        else:
            raise ValueError(f"Invalid protocol: {self.session.protocol}")

        s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, BUF_SIZE)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, BUF_SIZE)

        if timeout is not None:
            s.settimeout(timeout)

-        conn = None
-
-        return s, conn
+        return s, None

31-32: 🛠️ Refactor suggestion

Fix exception chaining to properly preserve error context.

When re-raising exceptions, you should use explicit exception chaining with from e to maintain the original error context. This helps with debugging by preserving the complete error trace.

        try:
            self.socket, self.conn = self.create_socket(timeout)
        except (socket.error, ValueError) as e:
-            raise ValueError(f"Error while creating socket: {e}")
+            raise ValueError(f"Error while creating socket: {e}") from e
🧰 Tools
🪛 Ruff (0.8.2)

32-32: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


87-101: 🛠️ Refactor suggestion

Fix inconsistency between docstring and implementation in receive method.

The docstring mentions returning a list of split messages, but the method actually returns a tuple of (decoded_data, size, timestamp). Additionally, the method only handles socket timeout exceptions but should also handle other socket errors.

    def receive(self, size: int) -> tuple[Optional[str], int, Optional[int]]:
        """
-        Receives data from the peer. In case off multiple message in the same packet, which should
-        not happen, they are already split and returned as a list.
+        Receives data from the peer and extracts relevant information.
+        
+        Args:
+            size: Maximum size of data to receive in bytes
+            
+        Returns:
+            tuple containing:
+                - Decoded data as string or None if no data received
+                - Size of received data in bytes
+                - Timestamp in milliseconds when data was received, or None if no data
+                
+        Raises:
+            ValueError: If protocol is not UDP
         """
         if self.session.protocol != Protocol.UDP:
             raise ValueError(f"Invalid protocol: {self.session.protocol}")

         try:
             data, _ = self.socket.recvfrom(size)
             now = int(datetime.now().timestamp() * 1000)
             return data.rstrip(b"\0").decode(), len(data), now
         except socket.timeout:
             return None, 0, None
+        except socket.error as e:
+            # Log the error for debugging purposes
+            import logging
+            logging.error(f"Error receiving data: {e}")
+            return None, 0, None
🧹 Nitpick comments (2)
ct-app/core/components/session_to_socket.py (2)

24-24: Fix misleading timeout type annotation.

The timeout parameter has type Optional[int] but the default value is a float (0.05). This inconsistency between type annotation and actual usage could lead to confusion.

-    def __init__(
-        self, session: Session, connect_address: str, timeout: Optional[int] = 0.05
-    ):
+    def __init__(
+        self, session: Session, connect_address: str, timeout: Optional[float] = 0.05
+    ):

62-62: Fix inconsistent timeout type annotation.

The type annotation for timeout in create_socket doesn't match the annotation in __init__.

-    def create_socket(self, timeout: Optional[int]):
+    def create_socket(self, timeout: Optional[float]):
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1fb7d5c and eed51d8.

📒 Files selected for processing (2)
  • ct-app/core/components/messages/message_format.py (3 hunks)
  • ct-app/core/components/session_to_socket.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • ct-app/core/components/messages/message_format.py
🧰 Additional context used
🪛 Ruff (0.8.2)
ct-app/core/components/session_to_socket.py

32-32: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

Comment thread ct-app/core/components/session_to_socket.py
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
ct-app/.configs/core_staging_config.yaml (1)

106-106: Document change rationale for messageMultiplier
You’ve reduced messageMultiplier from 5 to 1, which impacts batching and privacy/performance trade-offs. Ensure this decision is clearly documented for future maintainers.

🧹 Nitpick comments (3)
ct-app/.configs/core_staging_config.yaml (3)

57-57: Document rationale for APR update
The apr value under economicModel.legacy was changed to 0.000002. Please add a comment or update the PR description explaining why this rate was doubled and what behavioral impact is expected.


117-117: Remove trailing spaces
YAMLlint indicates trailing spaces on this line. Cleaning them up will keep the config lint-free.

-# ============================================================================= 
+# =============================================================================
🧰 Tools
🪛 YAMLlint (1.35.1)

[error] 117-117: trailing spaces

(trailing-spaces)


120-120: Address the TODO for packetSize
There’s a # TODO 3.0: change to 762 note on packetSize. Consider creating a ticket or roadmap item to ensure this update isn’t overlooked.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between eed51d8 and e40eb7e.

📒 Files selected for processing (1)
  • ct-app/.configs/core_staging_config.yaml (6 hunks)
🧰 Additional context used
🪛 YAMLlint (1.35.1)
ct-app/.configs/core_staging_config.yaml

[error] 117-117: trailing spaces

(trailing-spaces)

🔇 Additional comments (4)
ct-app/.configs/core_staging_config.yaml (4)

5-5: Ensure loader reads environment key
The new top-level environment: staging entry must be consumed by your configuration parser. Please verify that the code references config.environment and switches to the correct environment settings.


15-15: Verify placement of openSessions flag
The openSessions flag is defined under flags.core, but our session management logic lives in core/node.py. Confirm that this flag is being read from the correct namespace.


40-40: Verify placement of closeSessions flag
The closeSessions flag sits under flags.node. Ensure that your code paths for closing sessions reference this exact key and not another section.


119-123:

Details

✅ Verification successful

Verify session parameters alignment with code
The new sessions block defines packetSize, aggregatedPackets, and batchSize. Confirm these keys match those consumed by your session-to-socket and node buffer logic (e.g., look out for numPackets vs. aggregatedPackets).


🏁 Script executed:

#!/bin/bash
# Search for references to session config keys in core logic
rg -R "sessions\.(packetSize|aggregatedPackets|batchSize)" -n ct-app/core

Length of output: 480


🏁 Script executed:

#!/bin/bash
# Correct ripgrep recursion usage: search for session config references in core logic
rg -n 'sessions\.(packetSize|aggregatedPackets|batchSize)' ct-app/core

Length of output: 460


Session configuration keys match core logic usage
Verified that the new sessions keys correspond exactly to how they’re consumed in the core code:

  • ct-app/core/node.py:414 (batchSize in loop range)
  • ct-app/core/components/peer.py:160–162 (aggregatedPackets, packetSize)
  • ct-app/core/components/peer.py:167 (batchSize in sleep multiplier)

No further action needed.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
ct-app/.configs/core_staging_config.yaml (3)

56-56: Document rationale for updated APR parameter

The APR under economicModel.legacy has been bumped to 0.000002. Please add a comment or update the PR description to explain the economic model adjustment (e.g., simulation outcomes or policy decision) so future reviewers understand the change.


106-106: Justify reduction of messageMultiplier

Lowering peer.messageMultiplier from its previous value to 1 impacts batching behavior, privacy guarantees, and performance. Please include a rationale (in code comments or PR notes) describing why this trade-off was made and its expected effects.


117-117: Remove trailing whitespace to satisfy YAML lint

The blank comment line contains a trailing space, triggering a trailing-spaces lint error. Apply this diff to remove it:

-# 
+# 
🧰 Tools
🪛 YAMLlint (1.35.1)

[error] 117-117: trailing spaces

(trailing-spaces)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e40eb7e and 3032fbb.

📒 Files selected for processing (1)
  • ct-app/.configs/core_staging_config.yaml (6 hunks)
🧰 Additional context used
🪛 YAMLlint (1.35.1)
ct-app/.configs/core_staging_config.yaml

[error] 117-117: trailing spaces

(trailing-spaces)

🔇 Additional comments (4)
ct-app/.configs/core_staging_config.yaml (4)

5-5: Validate new environment key usage

Ensure that the configuration loader recognizes the top-level environment: staging field and that any environment-specific logic (e.g., feature toggles, logging levels) correctly branches on its value.


15-15: Ensure openSessions flag is consumed by session management

Confirm that flags.core.openSessions: 60 is being read by the new session-opening logic in ct-app/core/node.py (or equivalent) and properly throttles how often sessions are initiated.


40-40: Approve addition of closeSessions flag

Replacing the old observeRelayedMessages flag with flags.node.closeSessions: 30 aligns with the new session lifecycle design and looks correctly scoped under node.


119-122: Review new sessions configuration block

The newly added sessions section introduces packetSize: 452, aggregatedPackets: 3, and batchSize: 5. Please verify these defaults against production settings or provide justification for staging-specific values. Also, track the TODO on packetSize (update to 762 in v3.0) and ensure it’s addressed in a follow-up.

@jeandemeusy jeandemeusy merged commit 62894de into main May 12, 2025
1 check was pending
@jeandemeusy jeandemeusy deleted the jean/hoprd-2-2-compliance branch May 12, 2025 14:34
@coderabbitai coderabbitai Bot mentioned this pull request Jun 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Send message through sessions instead of using the API

3 participants