Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions verl/single_controller/base/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ def _get_free_port():
sock.bind(("", 0))
return sock.getsockname()[1]

@staticmethod
def _get_reusable_free_port():
listen_sock: socket.socket = socket.socket()
listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also need torch set socket.SO_REUSEPORT flag when listening on MASTER_PORT.

listen_sock.bind(("", 0))
_, port, *_ = listen_sock.getsockname()
return port
Comment on lines +66 to +71
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The socket listen_sock is not closed after use, which will lead to a resource leak. This can exhaust file descriptors if the function is called frequently. The socket should be created within a with statement to ensure it is properly closed, similar to the existing _get_free_port method.

Suggested change
listen_sock: socket.socket = socket.socket()
listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
listen_sock.bind(("", 0))
_, port, *_ = listen_sock.getsockname()
return port
with socket.socket() as listen_sock:
listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
listen_sock.bind(("", 0))
return listen_sock.getsockname()[1]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you comment on Gemini's review?


def get_availale_master_addr_port(self):
warnings.warn(
"This function is deprecated due to typo in name; Please use `get_available_master_addr_port` instead",
Expand All @@ -71,6 +80,9 @@ def get_availale_master_addr_port(self):
def get_available_master_addr_port(self):
return self._get_node_ip().strip("[]"), str(self._get_free_port())

def get_available_master_addr_reusable_port(self):
return self._get_node_ip().strip("[]"), str(self._get_reusable_free_port())


# we assume that in each WorkerGroup, there is a Master Worker
class Worker(WorkerHelper):
Expand Down