-
Notifications
You must be signed in to change notification settings - Fork 2.8k
[worker] feat: add reusable port #4377
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[worker] feat: add reusable port #4377
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a new method _get_reusable_free_port to find a reusable free port, intended to prevent port conflicts. While the goal is valid, the current implementation has a critical resource leak due to an unclosed socket. Additionally, it suffers from a race condition that undermines its purpose of avoiding port conflicts, as the port can be taken by another process after it's been identified but before it's used. My review includes a critical comment to fix the resource leak and a high-severity comment explaining the design flaw related to the race condition.
| listen_sock: socket.socket = socket.socket() | ||
| listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | ||
| listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) | ||
| listen_sock.bind(("", 0)) | ||
| _, port, *_ = listen_sock.getsockname() | ||
| return port |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The socket listen_sock is not closed after use, which will lead to a resource leak. This can exhaust file descriptors if the function is called frequently. The socket should be created within a with statement to ensure it is properly closed, similar to the existing _get_free_port method.
| listen_sock: socket.socket = socket.socket() | |
| listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) | |
| listen_sock.bind(("", 0)) | |
| _, port, *_ = listen_sock.getsockname() | |
| return port | |
| with socket.socket() as listen_sock: | |
| listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) | |
| listen_sock.bind(("", 0)) | |
| return listen_sock.getsockname()[1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you comment on Gemini's review?
| def _get_reusable_free_port(): | ||
| listen_sock: socket.socket = socket.socket() | ||
| listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | ||
| listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also need torch set socket.SO_REUSEPORT flag when listening on MASTER_PORT.
What does this PR do?
add a reusable port allocation method, avoiding port conflict when port is created in a worker and later grabbed by other process. Intentionally leave the port occupied so that it can be used by whoever really need the port with the port reusable option on.
Checklist Before Starting
[{modules}] {type}: {description}(This will be checked by the CI){modules}includefsdp,megatron,sglang,vllm,rollout,trainer,ci,training_utils,recipe,hardware,deployment,ray,worker,single_controller,misc,perf,model,algo,env,tool,ckpt,doc,data,like[megatron, fsdp, doc]{type}is infeat,fix,refactor,chore,test[BREAKING]to the beginning of the title.[BREAKING][fsdp, megatron] feat: dynamic batchingTest
API and Usage Example
# Add code snippet or script demonstrating how to use thisDesign & Code Changes
Checklist Before Submitting
Important
Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.
pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=alwaysci-requestchannel in theverlSlack workspace. (If not accessible, please try the Feishu group (飞书群).)