Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions torchrl/collectors/_single.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@
from torchrl.weight_update.utils import _resolve_model


def _cuda_sync_if_initialized():
"""Synchronize CUDA only if it has been initialized.

This is a safe alternative to calling `torch.cuda.synchronize()` directly.
In forked subprocesses on machines with CUDA, calling `synchronize()` will
fail with "Cannot re-initialize CUDA in forked subprocess" if CUDA was
initialized in the parent process before fork. By checking
`is_initialized()` first, we skip the sync in such cases since no CUDA
operations have occurred in this process.
"""
if torch.cuda.is_initialized():
torch.cuda.synchronize()


@accept_remote_rref_udf_invocation
class Collector(BaseCollector):
"""Generic data collector for RL problems. Requires an environment constructor and a policy.
Expand Down Expand Up @@ -518,9 +532,14 @@ def _setup_devices(
def _get_sync_fn(self, device: torch.device | None) -> Callable:
"""Get the appropriate synchronization function for a device."""
if device is not None and device.type != "cuda":
# Cuda handles sync
# When destination is not CUDA, we may need to sync to wait for
# async GPU→CPU transfers to complete before proceeding.
if torch.cuda.is_available():
return torch.cuda.synchronize
# Return a safe wrapper that only syncs if CUDA was actually
# initialized. This avoids "Cannot re-initialize CUDA in forked
# subprocess" errors when using fork start method on GPU machines
# with CPU-only collectors.
return _cuda_sync_if_initialized
elif torch.backends.mps.is_available() and hasattr(torch, "mps"):
return torch.mps.synchronize
elif hasattr(torch, "npu") and torch.npu.is_available():
Expand Down
Loading