Skip to content

[rollout] test: bucketed transfer utils#5309

Open
pengwu22 wants to merge 2 commits intoverl-project:mainfrom
pengwu22:pw/test-wt-utils
Open

[rollout] test: bucketed transfer utils#5309
pengwu22 wants to merge 2 commits intoverl-project:mainfrom
pengwu22:pw/test-wt-utils

Conversation

@pengwu22
Copy link
Collaborator

What does this PR do?

  • Abstract the current vllm weight update helper out for clear interfaces, and tests

Test

For changes that can not be tested by CI (e.g., algorithm implementation, new model support), validate by experiment(s) and show results like training curve plots, evaluation results, etc.

  • Extra unittests covering shm and ipc

Checklist Before Submitting

Important

Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the weight transfer logic into a new, dedicated module bucketed_weight_transfer.py, improving code organization and testability. However, a critical security vulnerability has been identified in the new bucketed weight transfer mechanism due to insecure deserialization and arbitrary code execution. The use of ZMQ's recv_pyobj (which uses pickle) over a predictable IPC socket path in /tmp/ allows any user on the same host to achieve code execution. This must be addressed by using secure serialization and avoiding the transmission of executable callables. Additionally, two high-severity robustness issues were found: one concerning the reuse of a helper function for shared memory creation to improve error handling, and another regarding the fragility of relying on a hardcoded index for CUDA IPC.

Comment on lines +45 to +51
func, args = handle
list_args = list(args)
if device_id is not None:
# the key is to change device id to the current device id
# in case two processes have different CUDA_VISIBLE_DEVICES
list_args[6] = device_id
buffer = func(*list_args)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This function relies on a hardcoded index 6 to modify the device ID for CUDA IPC tensor reconstruction. This is highly fragile as it depends on the internal data structure returned by torch.multiprocessing.reductions.reduce_tensor, which is not a public API and can change without notice in future PyTorch versions. A change in PyTorch could lead to silent failures or hard-to-debug errors. To make this slightly more robust, an assertion should be added to validate the structure of the arguments before modification.

Suggested change
func, args = handle
list_args = list(args)
if device_id is not None:
# the key is to change device id to the current device id
# in case two processes have different CUDA_VISIBLE_DEVICES
list_args[6] = device_id
buffer = func(*list_args)
func, args = handle
list_args = list(args)
if device_id is not None:
# The tuple from reduce_tensor is (rebuild_fn, (tensor_cls, tensor_size, storage_handle, storage_offset, requires_grad, device_id, ipc_handle))
# We are modifying the device_id at index 6.
assert len(list_args) >= 7, "Unexpected arguments structure for CUDA IPC tensor reconstruction."
# the key is to change device id to the current device id
# in case two processes have different CUDA_VISIBLE_DEVICES
list_args[6] = device_id
buffer = func(*list_args)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as original, no change


# Create unique name for shared memory
shm_name = f"verl_weights_{uuid.uuid4().hex}"
shm = shared_memory.SharedMemory(name=shm_name, create=True, size=self.bucket_size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The create_shared_memory helper function, which correctly handles attaching to existing shared memory segments in case of a FileExistsError, is defined in this file but not used here. Instead, shared_memory.SharedMemory(create=True) is called directly. This could lead to unhandled FileExistsError exceptions if a shared memory segment from a previous crashed run was not cleaned up. To improve robustness and reuse the existing helper, please use create_shared_memory.

Suggested change
shm = shared_memory.SharedMemory(name=shm_name, create=True, size=self.bucket_size)
shm = create_shared_memory(self.bucket_size, shm_name)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as original. this part not changed

@wuxibin89
Copy link
Collaborator

Hold this PR until #5029 merged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants