Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions p2p/benchmarks/benchmark_uccl.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,29 +370,29 @@ def _run_server_ipc(args, ep):

# Warm-up transfer
if args.async_api:
ok, transfer_id = ep.recv_ipc_async(conn_id, ptr, size)
assert ok, "[Server] recv_ipc_async error"
ok, transfer_id = ep.recv_async(conn_id, 0, ptr, size)
assert ok, "[Server] recv_async error"
is_done = False
while not is_done:
ok, is_done = ep.poll_async(transfer_id)
assert ok, "[Server] poll_async error"
else:
ok = ep.recv_ipc(conn_id, ptr, size)
assert ok, "[Server] recv_ipc error"
ok = ep.recv(conn_id, 0, ptr, size)
assert ok, "[Server] recv error"

start = time.perf_counter()
total_recv = 0
for _ in range(args.iters):
if args.async_api:
ok, transfer_id = ep.recv_ipc_async(conn_id, ptr, size)
assert ok, "[Server] recv_ipc_async error"
ok, transfer_id = ep.recv_async(conn_id, 0, ptr, size)
assert ok, "[Server] recv_async error"
is_done = False
while not is_done:
ok, is_done = ep.poll_async(transfer_id)
assert ok, "[Server] poll_async error"
else:
ok = ep.recv_ipc(conn_id, ptr, size)
assert ok, "[Server] recv_ipc error"
ok = ep.recv(conn_id, 0, ptr, size)
assert ok, "[Server] recv error"
total_recv += size
elapsed = time.perf_counter() - start

Expand All @@ -417,29 +417,29 @@ def _run_client_ipc(args, ep, remote_gpu_idx):

# Warm-up transfer
if args.async_api:
ok, transfer_id = ep.send_ipc_async(conn_id, ptr, size)
assert ok, "[Client] send_ipc_async error"
ok, transfer_id = ep.send_async(conn_id, 0, ptr, size)
assert ok, "[Client] send_async error"
is_done = False
while not is_done:
ok, is_done = ep.poll_async(transfer_id)
assert ok, "[Client] poll_async error"
else:
ok = ep.send_ipc(conn_id, ptr, size)
assert ok, "[Client] send_ipc error"
ok = ep.send(conn_id, 0, ptr, size)
assert ok, "[Client] send error"

start = time.perf_counter()
total_sent = 0
for _ in range(args.iters):
if args.async_api:
ok, transfer_id = ep.send_ipc_async(conn_id, ptr, size)
assert ok, "[Client] send_ipc_async error"
ok, transfer_id = ep.send_async(conn_id, 0, ptr, size)
assert ok, "[Client] send_async error"
is_done = False
while not is_done:
ok, is_done = ep.poll_async(transfer_id)
assert ok, "[Client] poll_async error"
else:
ok = ep.send_ipc(conn_id, ptr, size)
assert ok, "[Client] send_ipc error"
ok = ep.send(conn_id, 0, ptr, size)
assert ok, "[Client] send error"
total_sent += size
elapsed = time.perf_counter() - start

Expand Down
152 changes: 62 additions & 90 deletions p2p/collective.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,25 +393,22 @@ def send(self, tensor: torch.Tensor, dst: int):
ptr, size = self._get_buffer_info(tensor)
conn_id = self.send_connections[dst]

if self.local_connections[dst]:
if self.use_copy_engine_for_intra:
# Use IPC for local connection (no memory registration needed)
ok = self.ep.send_ipc(conn_id, ptr, size)
if not ok:
raise RuntimeError(f"Failed to initiate IPC send to rank {dst}")
else:
dist.send(tensor, dst, group=self.group)
if self.local_connections[dst] and not self.use_copy_engine_for_intra:
dist.send(tensor, dst, group=self.group)
else:
# Use RDMA for remote connection (requires memory registration)
mr_id = self.check_tensor_registered(tensor)
if mr_id == None:
mr_id = (
0
if self.local_connections[dst]
else self.check_tensor_registered(tensor)
)
if mr_id is None:
raise RuntimeError(
f"Tensor memory not registered for remote communication with rank {dst}. "
f"Call register_tensor() first for tensors used with remote ranks."
)
ok = self.ep.send(conn_id, mr_id, ptr, size)
if not ok:
raise RuntimeError(f"Failed to initiate RDMA send to rank {dst}")
raise RuntimeError(f"Failed to send to rank {dst}")

def recv(self, tensor: torch.Tensor, src: int):
"""Receive tensor from source rank (synchronous)."""
Expand All @@ -427,25 +424,22 @@ def recv(self, tensor: torch.Tensor, src: int):
ptr, size = self._get_buffer_info(tensor)
conn_id = self.recv_connections[src]

if self.local_connections[src]:
if self.use_copy_engine_for_intra:
# Use IPC for local connection (no memory registration needed)
ok = self.ep.recv_ipc(conn_id, ptr, size)
if not ok:
raise RuntimeError(f"Failed to initiate IPC recv from rank {src}")
else:
dist.recv(tensor, src, group=self.group)
if self.local_connections[src] and not self.use_copy_engine_for_intra:
dist.recv(tensor, src, group=self.group)
else:
# Use RDMA for remote connection (requires memory registration)
mr_id = self.check_tensor_registered(tensor)
if mr_id == None:
mr_id = (
0
if self.local_connections[src]
else self.check_tensor_registered(tensor)
)
if mr_id is None:
raise RuntimeError(
f"Tensor memory not registered for remote communication with rank {src}. "
f"Call register_tensor() first for tensors used with remote ranks."
)
ok = self.ep.recv(conn_id, mr_id, ptr, size)
if not ok:
raise RuntimeError(f"Failed to initiate RDMA recv from rank {src}")
raise RuntimeError(f"Failed to recv from rank {src}")

def isend(self, tensor: torch.Tensor, dst: int) -> Union[int, dist.Work]:
"""Initiate asynchronous send (non-blocking)."""
Expand All @@ -461,31 +455,25 @@ def isend(self, tensor: torch.Tensor, dst: int) -> Union[int, dist.Work]:
ptr, size = self._get_buffer_info(tensor)
conn_id = self.send_connections[dst]

if self.local_connections[dst]:
if self.use_copy_engine_for_intra:
# Use IPC async for local connection (no memory registration needed)
ok, transfer_handle = self.ep.send_ipc_async(conn_id, ptr, size)
if not ok:
raise RuntimeError(
f"Failed to initiate async IPC send to rank {dst}"
)
return transfer_handle
else:
# Use NCCL - start immediately and return Work object
op = dist.P2POp(dist.isend, tensor, dst, group=self.group)
reqs = dist.batch_isend_irecv([op])
return reqs[0]
if self.local_connections[dst] and not self.use_copy_engine_for_intra:
# Use NCCL - start immediately and return Work object
op = dist.P2POp(dist.isend, tensor, dst, group=self.group)
reqs = dist.batch_isend_irecv([op])
return reqs[0]
else:
# Use RDMA async for remote connection (requires memory registration)
mr_id = self.check_tensor_registered(tensor)
if mr_id == None:
mr_id = (
0
if self.local_connections[dst]
else self.check_tensor_registered(tensor)
)
if mr_id is None:
raise RuntimeError(
f"Tensor memory not registered for remote communication with rank {dst}. "
f"Call register_tensor() first for tensors used with remote ranks."
)
ok, transfer_handle = self.ep.send_async(conn_id, mr_id, ptr, size)
if not ok:
raise RuntimeError(f"Failed to initiate async RDMA send to rank {dst}")
raise RuntimeError(f"Failed to initiate async send to rank {dst}")
return transfer_handle

def irecv(self, tensor: torch.Tensor, src: int) -> Union[int, dist.Work]:
Expand All @@ -504,33 +492,25 @@ def irecv(self, tensor: torch.Tensor, src: int) -> Union[int, dist.Work]:
ptr, size = self._get_buffer_info(tensor)
conn_id = self.recv_connections[src]

if self.local_connections[src]:
if self.use_copy_engine_for_intra:
# Use IPC async for local connection (no memory registration needed)
ok, transfer_handle = self.ep.recv_ipc_async(conn_id, ptr, size)
if not ok:
raise RuntimeError(
f"Failed to initiate async IPC recv from rank {src}"
)
return transfer_handle
else:
# Use NCCL - start immediately and return Work object
op = dist.P2POp(dist.irecv, tensor, src, group=self.group)
reqs = dist.batch_isend_irecv([op])
return reqs[0]
if self.local_connections[src] and not self.use_copy_engine_for_intra:
# Use NCCL - start immediately and return Work object
op = dist.P2POp(dist.irecv, tensor, src, group=self.group)
reqs = dist.batch_isend_irecv([op])
return reqs[0]
else:
# Use RDMA async for remote connection (requires memory registration)
mr_id = self.check_tensor_registered(tensor)
if mr_id == None:
mr_id = (
0
if self.local_connections[src]
else self.check_tensor_registered(tensor)
)
if mr_id is None:
raise RuntimeError(
f"Tensor memory not registered for remote communication with rank {src}. "
f"Call register_tensor() first for tensors used with remote ranks."
)
ok, transfer_handle = self.ep.recv_async(conn_id, mr_id, ptr, size)
if not ok:
raise RuntimeError(
f"Failed to initiate async RDMA recv from rank {src}"
)
raise RuntimeError(f"Failed to initiate async recv from rank {src}")
return transfer_handle

def test(self, transfer_handle: Union[int, dist.Work]) -> bool:
Expand Down Expand Up @@ -643,50 +623,42 @@ def batch_isend_irecv(
continue # Skip self-communication

if op_type == "send":
if self.local_connections[peer]:
if self.use_copy_engine_for_intra:
ptr, size = self._get_buffer_info(tensor)
conn_id = self.send_connections[peer]
ok, handle = self.ep.send_ipc_async(conn_id, ptr, size)
if not ok:
raise RuntimeError(f"Failed IPC send to {peer}")
int_handles.append(handle)
else:
p2p_ops.append(
dist.P2POp(dist.isend, tensor, peer, group=self.group)
)
if self.local_connections[peer] and not self.use_copy_engine_for_intra:
p2p_ops.append(
dist.P2POp(dist.isend, tensor, peer, group=self.group)
)
else:
ptr, size = self._get_buffer_info(tensor)
conn_id = self.send_connections[peer]
mr_id = self.check_tensor_registered(tensor)
mr_id = (
0
if self.local_connections[peer]
else self.check_tensor_registered(tensor)
)
if mr_id is None:
raise RuntimeError(f"Tensor not registered for rank {peer}")
ok, handle = self.ep.send_async(conn_id, mr_id, ptr, size)
if not ok:
raise RuntimeError(f"Failed RDMA send to {peer}")
raise RuntimeError(f"Failed send to {peer}")
int_handles.append(handle)
else: # recv
if self.local_connections[peer]:
if self.use_copy_engine_for_intra:
ptr, size = self._get_buffer_info(tensor)
conn_id = self.recv_connections[peer]
ok, handle = self.ep.recv_ipc_async(conn_id, ptr, size)
if not ok:
raise RuntimeError(f"Failed IPC recv from {peer}")
int_handles.append(handle)
else:
p2p_ops.append(
dist.P2POp(dist.irecv, tensor, peer, group=self.group)
)
if self.local_connections[peer] and not self.use_copy_engine_for_intra:
p2p_ops.append(
dist.P2POp(dist.irecv, tensor, peer, group=self.group)
)
else:
ptr, size = self._get_buffer_info(tensor)
conn_id = self.recv_connections[peer]
mr_id = self.check_tensor_registered(tensor)
mr_id = (
0
if self.local_connections[peer]
else self.check_tensor_registered(tensor)
)
if mr_id is None:
raise RuntimeError(f"Tensor not registered for rank {peer}")
ok, handle = self.ep.recv_async(conn_id, mr_id, ptr, size)
if not ok:
raise RuntimeError(f"Failed RDMA recv from {peer}")
raise RuntimeError(f"Failed recv from {peer}")
int_handles.append(handle)

# Batch execute NCCL P2POps
Expand Down
Loading