Skip to content

Commit 3a6cf2e

Browse files
TroyGardenmeta-codesync[bot]
authored andcommitted
memory snapshot and footprint for non-blocking copy (#3485)
Summary: Pull Request resolved: #3485 Workplace post: https://fb.workplace.com/groups/pytorch.dev/permalink/1922774931634311/ google doc: https://docs.google.com/document/d/1pBRMknSAmuD0-ybWHwv1ZrPT6H-dYqNyjmxSW_6djEs # context * It has been well explained how to do a timly efficient copy of a tensor from host to device [A guide on good usage of non_blocking and pin_memory() in PyTorch](https://docs.pytorch.org/tutorials/intermediate/pinmem_nonblock.html) * Two requirements to enable non-blocking data transfer from both the host and the device sides are: **use pin_memory on the host side** **assign a side cuda stream for data transfer** * a more realistic example with using the data on device after the data transfer: ``` # data are often from dataloader on the host side host_tensor = dataloader().pin_memory() main_stream = torch.cuda.current_stream() side_stream = torch.cuda.Stream() # use a side stream for non-blocking data transfer # without this side stream the data transfer runs on current (main) stream with side_stream: device_tensor = host_tensor.to(device='cuda', non_blocking=True) # prevent device_tensor being freed on the side stream (since there's no follow-up usage on the side stream) device_tensor.record_stream(main_stream) # device can do some irrelevant compute some_function() # main stream needs the data so has to wait for the data transfer to complete main_stream.wait(side_stream) use_the_data(device_tensor) ``` * a small example also confirm the behavior (see the reproduce section later): with non-blocking data transfer, both cpu and gpu executions are non-blocking, the data transfer starts immediately with cpu execution. while without using an extra side-stream for data transfer, the host-to-device copy runs on the main cuda stream and only starts when the stream is available (delayed). {F1983001559} # how about memory efficiency * However, the memory consumption (footprint) is not considered in the previous discussion * As explained in this blog: [A guide to PyTorch's CUDA Caching Allocator](https://zdevito.github.io/2022/08/04/cuda-caching-allocator.html), the tensor created in a side stream won't be shared by other streams. * In other words, although the tensor copied from host is primarily used in the main stream, it's created in the side stream so it will be collected and returned to the side stream by the cache allocator. The freed memory goes to the "reserved memory" associating with the side stream, and cannot be (re-)used by other operations in the main stream. The cuda memory footprint comparison is shown in the diagram below. {F1983002150} * the memory snapshots show similar maximum memory usage in both scenarios in the **active memory timeline**, but the **active cached segment timeline** reveals different overall memory usage (footprint) {F1983001762} {F1983001764} # is it the price we must pay * Not necessarily. We can actually work around this by pre-allocating the memory from the main stream and use in-place copy to do the data transfer on the side stream. Just make sure the main stream await the side stream before using the transferred data, and once the data is deleted it can be re-used by the main stream again. diagram shown below {F1983002149} * This idea is also verified in a small example (also included in the reproduce section). The executions are non-blocking on both cpu and gpu sides from the trace comparison. {F1983001257} * the **active memory timeline** show similar pattern between the "pre-allcoated non-blocking copy" and "non-blocking copy", but in the **active cached segment timeline**, the "pre-allcoated non-blocking copy" has less memory usage (same as the "blocking copy") than the "non-blocking copy". {F1983001817} {F1983001823} # discussions * are all problems solved? unfortunately not yet we noticed that the host-to-device data transfer speed is much slower in the in-place copy than the regular copy (5.8 GB/s vs 11.8 GB/s), regardless of blocking or non-blocking data transfer (shown in the first screenshot). * development experience sucks for the pre-allocation approach. the user has to explicitly figure out the input data size, and in a common use case the input are wrapped in the `ModelInput` class with complex data structure, which often only implements the `.to(...)` method, not the in-place copy. * how much headroom? most of the production models use (some kind of) TorchRec's train pipeline, which share the same `copy_batch_to_gpu` method. Estimating from a few production models we have been working on with, the input KJT is about 1~3GB on each rank. Also worth mentioning that judging from the trace in most cases the `copy_batch_to_gpu` step isn't very long (can afford 50% slowness), and in some suboptimal use case the input data does not uses `pin_memory()`, causing cpu-side blocking. * my wish: is it possible that PyTorch can make async host-to-device transfer execute on side stream while the transferred data on the main stream? something like: ``` host_tensor = dataloader().pin_memory() main_stream = torch.cuda.current_stream() side_stream = torch.cuda.Stream() with side_stream: # run data transfer on side stream tensor = host_tensor.to("cuda", non_blocking=True, target_stream=main_stream) ... # use the transferred data main_stream.wait(side_stream) do_something(tensor) ``` # benchmark |name|GPU Peak Memory alloc|GPU Peak Memory reserved|CPU Peak RSS| |--|--| |blocking_copy|0.13 GB|0.13 GB|1.49 GB| |non_blocking_copy|0.13 GB|**0.15 GB**|1.49 GB| |preallocated_non_blocking_copy|0.13 GB|0.13 GB|1.49 GB| # reproduce * blocking copy ``` python -m torchrec/distributed/benchmark/benchmark_comms.py -- \ a2a_single --memory_snapshot=1 --num_mul=20 \ --name=blocking_copy ``` * non-blocking copy ``` python -m torchrec/distributed/benchmark/benchmark_comms.py -- \ a2a_single --memory_snapshot=1 --num_mul=20 \ --name=non_blocking_copy ``` * non-blocking copy with preallocated memory ``` python -m torchrec/distributed/benchmark/benchmark_comms.py -- \ a2a_single --memory_snapshot=1 --num_mul=20 \ --name=preallocated_non_blocking_copy ``` Reviewed By: spmex Differential Revision: D85508674 fbshipit-source-id: a7f64a70c6e452d330d544e4fe80d55dab862492
1 parent e6e5e8f commit 3a6cf2e

File tree

1 file changed

+89
-16
lines changed

1 file changed

+89
-16
lines changed

torchrec/distributed/benchmark/benchmark_comms.py

Lines changed: 89 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,74 @@ def multi_stream_optimized(
475475
assert checks.item()
476476

477477

478+
# an optimized version of muti-stream memory footprint
479+
def non_blocking_copy(
480+
_batch_inputs: List[Dict[str, Any]],
481+
dim: int,
482+
num_mul: int,
483+
num_concat: int,
484+
ctx: MultiProcessContext,
485+
preallocated: bool = False,
486+
**_kwargs: Dict[str, Any],
487+
) -> None:
488+
with record_function("## setup ##"):
489+
main_stream = torch.cuda.current_stream()
490+
data_copy_stream = torch.cuda.Stream()
491+
irrelevant_data = torch.rand(dim, dim, device=ctx.device) - 0.5
492+
493+
# the host to device data transfer will block cuda execution without the `pin_memory()`
494+
host_data = (torch.rand(dim, dim) - 0.5).pin_memory()
495+
if preallocated:
496+
# pre-allocate memory on the device for the incoming data transfer from the host
497+
device_data = torch.empty_like(host_data, device=ctx.device)
498+
else:
499+
device_data = torch.empty(0, device=ctx.device)
500+
501+
with record_function("## irrelevant compute before h2d ##"):
502+
pre_comms = _compute(
503+
dim=dim, num_mul=num_mul, num_concat=1, ctx=ctx, x=irrelevant_data
504+
)
505+
506+
with record_function("## copy data to device ##"):
507+
with data_copy_stream:
508+
if preallocated:
509+
# copy data to device, this will not block the main stream
510+
device_data.copy_(host_data, non_blocking=True)
511+
else:
512+
device_data = host_data.to(ctx.device, non_blocking=True)
513+
514+
with record_function("## irrelevant compute after h2d ##"):
515+
irrelevant_data = torch.rand(dim, dim, device=ctx.device) - 0.5
516+
pre_comms = _compute(
517+
dim=dim, num_mul=num_mul, num_concat=1, ctx=ctx, x=irrelevant_data
518+
)
519+
520+
with record_function("## pre-comms compute ##"):
521+
# make sure the data copy is done before the pre-comms compute
522+
main_stream.wait_stream(data_copy_stream)
523+
pre_comms = _compute(
524+
dim=dim, num_mul=num_mul, num_concat=1, ctx=ctx, x=device_data
525+
)
526+
527+
528+
def preallocated_non_blocking_copy(
529+
_batch_inputs: List[Dict[str, Any]],
530+
dim: int,
531+
num_mul: int,
532+
num_concat: int,
533+
ctx: MultiProcessContext,
534+
**_kwargs: Dict[str, Any],
535+
) -> None:
536+
return non_blocking_copy(
537+
_batch_inputs=_batch_inputs,
538+
dim=dim,
539+
num_mul=num_mul,
540+
num_concat=num_concat,
541+
ctx=ctx,
542+
preallocated=True,
543+
)
544+
545+
478546
# single-rank runner
479547
def a2a_single_runner(rank: int, world_size: int, arg: AllToAllSingleRunConfig) -> None:
480548
# Ensure GPUs are available and we have enough of them
@@ -489,22 +557,27 @@ def a2a_single_runner(rank: int, world_size: int, arg: AllToAllSingleRunConfig)
489557
backend="nccl",
490558
use_deterministic_algorithms=False,
491559
) as ctx:
492-
if arg.name.startswith("a2a_sync_base"):
493-
func = a2a_sync_base
494-
elif arg.name.startswith("a2a_async_base"):
495-
func = a2a_async_base
496-
elif arg.name.startswith("a2a_async_twice"):
497-
func = a2a_async_twice
498-
elif arg.name.startswith("lazyawaitable"):
499-
func = lazyawaitable
500-
elif arg.name.startswith("multi_stream_memory"):
501-
func = multi_stream_memory
502-
elif arg.name.startswith("single_stream_memory"):
503-
func = single_stream_memory
504-
elif arg.name.startswith("multi_stream_optimized"):
505-
func = multi_stream_optimized
506-
else:
507-
raise ValueError(f"Unknown benchmark name: {arg.name}")
560+
match arg.name.lower():
561+
case "a2a_sync_base":
562+
func = a2a_sync_base
563+
case "a2a_async_base":
564+
func = a2a_async_base
565+
case "a2a_async_twice":
566+
func = a2a_async_twice
567+
case "lazyawaitable":
568+
func = lazyawaitable
569+
case "multi_stream_memory":
570+
func = multi_stream_memory
571+
case "single_stream_memory":
572+
func = single_stream_memory
573+
case "multi_stream_optimized":
574+
func = multi_stream_optimized
575+
case "non_blocking_copy":
576+
func = non_blocking_copy
577+
case "preallocated_non_blocking_copy":
578+
func = preallocated_non_blocking_copy
579+
case _:
580+
raise ValueError(f"Unknown benchmark name: {arg.name}")
508581

509582
result = benchmark_func(
510583
bench_inputs=[],

0 commit comments

Comments
 (0)