Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/complete_cv_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def training_function(config, args):
seed = int(config["seed"])
batch_size = int(config["batch_size"])
image_size = config["image_size"]
if not isinstance(image_size, (list, tuple)):
if not isinstance(image_size, list | tuple):
image_size = (image_size, image_size)

# Parse out whether we are saving every epoch or after a certain number of batches
Expand Down
2 changes: 1 addition & 1 deletion examples/cv_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def training_function(config, args):
seed = int(config["seed"])
batch_size = int(config["batch_size"])
image_size = config["image_size"]
if not isinstance(image_size, (list, tuple)):
if not isinstance(image_size, list | tuple):
image_size = (image_size, image_size)

# Grab all the image filenames
Expand Down
10 changes: 6 additions & 4 deletions examples/inference/pippy/README.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
# Distributed inference examples with PiPPy
# Distributed inference examples with Pipeline Parallelism

This repo contains a variety of tutorials for using the [PiPPy](https://github.com/PyTorch/PiPPy) pipeline parallelism library with accelerate. You will find examples covering:
This repo contains a variety of tutorials for using PyTorch's built-in pipeline parallelism (`torch.distributed.pipelining`) with accelerate. You will find examples covering:

1. How to trace the model using `accelerate.prepare_pippy`
2. How to specify inputs based on what the model expects (when to use `kwargs`, `args`, and such)
3. How to gather the results at the end.

## Installation

This requires the `main` branch of accelerate (or a version at least 0.27.0), `pippy` version of 0.2.0 or greater, and at least python 3.9. Please install using `pip install .` to pull from the `setup.py` in this repo, or run manually:
This requires the `main` branch of accelerate (or a version at least 0.27.0), PyTorch 2.4.0 or later (which includes `torch.distributed.pipelining`), and at least python 3.9. Please install using `pip install .` to pull from the `setup.py` in this repo, or run manually:

```bash
pip install 'accelerate>=0.27.0' 'torchpippy>=0.2.0'
pip install 'accelerate>=0.27.0' 'torch>=2.4.0'
```

Note: The `torchpippy` package has been merged into PyTorch 2.4.0+ as `torch.distributed.pipelining`. If you're using PyTorch 2.4.0 or later, no additional package installation is needed.

## Running code

You can either use `torchrun` or the recommended way of `accelerate launch` (without needing to run `accelerate config`) on each script:
Expand Down
4 changes: 2 additions & 2 deletions examples/inference/pippy/t5.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@

if version.parse(accelerate_version) > version.parse("0.33.0"):
raise RuntimeError(
"Using encoder/decoder models is not supported with the `torch.pipelining` integration or accelerate>=0.34.0. "
"Please use a lower accelerate version and `torchpippy`, which this example uses."
"Using encoder/decoder models is not supported with the `torch.distributed.pipelining` integration or accelerate>=0.34.0. "
"Please use a lower accelerate version. Note: `torchpippy` has been merged into PyTorch 2.4.0+ as `torch.distributed.pipelining`."
)


Expand Down
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
"diffusers",
"evaluate",
"torchdata>=0.8.0",
"torchpippy>=0.2.0",
"transformers",
"scipy",
"scikit-learn",
Expand Down
16 changes: 8 additions & 8 deletions src/accelerate/accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2184,9 +2184,9 @@ def _prepare_deepspeed(self, *args):
for obj in result:
if isinstance(obj, torch.nn.Module):
model = obj
elif isinstance(obj, (torch.optim.Optimizer, DummyOptim)):
elif isinstance(obj, torch.optim.Optimizer | DummyOptim):
optimizer = obj
elif (isinstance(obj, (LRScheduler, DummyScheduler))) or (
elif (isinstance(obj, LRScheduler | DummyScheduler)) or (
type(obj).__name__ in deepspeed.runtime.lr_schedules.VALID_LR_SCHEDULES
):
scheduler = obj
Expand Down Expand Up @@ -2405,9 +2405,9 @@ def _prepare_deepspeed(self, *args):
for i in range(len(result)):
if isinstance(result[i], torch.nn.Module):
result[i] = engine
elif isinstance(result[i], (torch.optim.Optimizer, DummyOptim)):
elif isinstance(result[i], torch.optim.Optimizer | DummyOptim):
result[i] = optimizer
elif (isinstance(result[i], (LRScheduler, DummyScheduler))) or (
elif (isinstance(result[i], LRScheduler | DummyScheduler)) or (
type(result[i]).__name__ in deepspeed.runtime.lr_schedules.VALID_LR_SCHEDULES
):
result[i] = scheduler
Expand Down Expand Up @@ -2488,7 +2488,7 @@ def _prepare_megatron_lm(self, *args):
model = obj
elif isinstance(obj, (torch.optim.Optimizer)):
optimizer = obj
elif isinstance(obj, (LRScheduler, MegatronLMDummyScheduler)):
elif isinstance(obj, LRScheduler | MegatronLMDummyScheduler):
scheduler = obj

if model is not None:
Expand Down Expand Up @@ -2757,7 +2757,7 @@ def prepare_optimizer(self, optimizer: torch.optim.Optimizer, device_placement=N
from lomo_optim import AdaLomo, Lomo

# Support multiple optimizers: https://github.com/huggingface/accelerate/pull/2695#discussion_r1589164607
self.has_lomo_optimizer |= isinstance(optimizer, (Lomo, AdaLomo))
self.has_lomo_optimizer |= isinstance(optimizer, Lomo | AdaLomo)

# Ensure we can't double wrap an optimizer due to `find_batch_size`
if getattr(optimizer, "_is_accelerate_prepared", False):
Expand Down Expand Up @@ -2935,7 +2935,7 @@ def unscale_gradients(self, optimizer=None):
if optimizer is None:
# TODO: this unscales all optimizers where we should only unscale the one where parameters are.
optimizer = self._optimizers
elif not isinstance(optimizer, (tuple, list)):
elif not isinstance(optimizer, tuple | list):
optimizer = [optimizer]
for opt in optimizer:
while isinstance(opt, AcceleratedOptimizer):
Expand Down Expand Up @@ -4324,7 +4324,7 @@ def lomo_backward(self, loss: torch.Tensor, learning_rate: float) -> None:
_backward_called = False

for optimizer in self._optimizers:
if isinstance(optimizer.optimizer, (Lomo, AdaLomo)):
if isinstance(optimizer.optimizer, Lomo | AdaLomo):
optimizer.optimizer.fused_backward(loss, learning_rate)
_backward_called = True

Expand Down
2 changes: 1 addition & 1 deletion src/accelerate/commands/estimate.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ def estimate_command(args):
data = gather_data(args)
for row in data:
for i, item in enumerate(row):
if isinstance(item, (int, float)):
if isinstance(item, int | float):
row[i] = convert_bytes(item)
elif isinstance(item, dict):
training_usage = max(item.values())
Expand Down
4 changes: 2 additions & 2 deletions src/accelerate/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@


def move_to_device(state, device):
if isinstance(state, (list, tuple)):
if isinstance(state, list | tuple):
return honor_type(state, (move_to_device(t, device) for t in state))
elif isinstance(state, dict):
return type(state)({k: move_to_device(v, device) for k, v in state.items()})
Expand Down Expand Up @@ -156,7 +156,7 @@ def step(self, closure=None):

if is_lomo_available():
# `step` should be a no-op for LOMO optimizers.
if isinstance(self.optimizer, (Lomo, AdaLomo)):
if isinstance(self.optimizer, Lomo | AdaLomo):
return

if self.gradient_state.sync_gradients:
Expand Down
2 changes: 1 addition & 1 deletion src/accelerate/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class AcceleratedScheduler:

def __init__(self, scheduler, optimizers, step_with_optimizer: bool = True, split_batches: bool = False):
self.scheduler = scheduler
self.optimizers = optimizers if isinstance(optimizers, (list, tuple)) else [optimizers]
self.optimizers = optimizers if isinstance(optimizers, list | tuple) else [optimizers]
self.split_batches = split_batches
self.step_with_optimizer = step_with_optimizer
self.gradient_state = GradientState()
Expand Down
2 changes: 1 addition & 1 deletion src/accelerate/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ def split_between_processes(self, inputs: list | tuple | dict | torch.Tensor, ap
end_index = start_index + num_samples_per_process + (1 if self.process_index < num_extras else 0)

def _split_values(inputs, start_index, end_index):
if isinstance(inputs, (list, tuple, torch.Tensor)):
if isinstance(inputs, list | tuple | torch.Tensor):
if start_index >= len(inputs):
result = inputs[-1:]
else:
Expand Down
7 changes: 4 additions & 3 deletions src/accelerate/test_utils/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,8 +519,9 @@ def require_mlflow(test_case):

def require_pippy(test_case):
"""
Decorator marking a test that requires pippy installed. These tests are skipped when pippy isn't installed It is
also checked if the test is running on a Gaudi1 device which doesn't support pippy.
Decorator marking a test that requires PyTorch 2.4.0+ (which includes torch.distributed.pipelining).
These tests are skipped when PyTorch < 2.4.0. It is also checked if the test is running on a Gaudi1 device
which doesn't support pippy.
"""
return unittest.skipUnless(is_pippy_available() and not is_habana_gaudi1(), "test requires pippy")(test_case)

Expand Down Expand Up @@ -687,7 +688,7 @@ def add_mocks(self, mocks: Union[mock.Mock, list[mock.Mock]]):
mocks (`mock.Mock` or list of `mock.Mock`):
Mocks that should be added to the `TestCase` after `TestCase.setUpClass` has been run
"""
self.mocks = mocks if isinstance(mocks, (tuple, list)) else [mocks]
self.mocks = mocks if isinstance(mocks, tuple | list) else [mocks]
for m in self.mocks:
m.start()
self.addCleanup(m.stop)
Expand Down
10 changes: 5 additions & 5 deletions src/accelerate/tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ def log(self, values: dict, step: Optional[int] = None, **kwargs):
"""
values = listify(values)
for k, v in values.items():
if isinstance(v, (int, float)):
if isinstance(v, int | float):
self.writer.add_scalar(k, v, global_step=step, **kwargs)
elif isinstance(v, str):
self.writer.add_text(k, v, global_step=step, **kwargs)
Expand Down Expand Up @@ -582,7 +582,7 @@ def log(self, values: dict, step: Optional[int] = None, **kwargs):
if step is not None:
self.writer.set_step(step)
for k, v in values.items():
if isinstance(v, (int, float)):
if isinstance(v, int | float):
self.writer.log_metric(k, v, step=step, **kwargs)
elif isinstance(v, str):
self.writer.log_other(k, v, **kwargs)
Expand Down Expand Up @@ -834,7 +834,7 @@ def log(self, values: dict, step: Optional[int]):
"""
metrics = {}
for k, v in values.items():
if isinstance(v, (int, float)):
if isinstance(v, int | float):
metrics[k] = v
else:
logger.warning_once(
Expand Down Expand Up @@ -980,7 +980,7 @@ def log(self, values: dict[str, Union[int, float]], step: Optional[int] = None,
"""
clearml_logger = self.task.get_logger()
for k, v in values.items():
if not isinstance(v, (int, float)):
if not isinstance(v, int | float):
logger.warning_once(
"Accelerator is attempting to log a value of "
f'"{v}" of type {type(v)} for key "{k}" as a scalar. '
Expand Down Expand Up @@ -1299,7 +1299,7 @@ def filter_trackers(
"""
loggers = []
if log_with is not None:
if not isinstance(log_with, (list, tuple)):
if not isinstance(log_with, list | tuple):
log_with = [log_with]
if "all" in log_with or LoggerType.ALL in log_with:
loggers = [o for o in log_with if issubclass(type(o), GeneralTracker)] + get_available_trackers()
Expand Down
2 changes: 1 addition & 1 deletion src/accelerate/utils/dataclasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -2906,7 +2906,7 @@ def __post_init__(self):
if not self.load_in_4bit and not self.load_in_8bit:
raise ValueError("load_in_4bit and load_in_8bit can't be both False")

if not isinstance(self.llm_int8_threshold, (int, float)):
if not isinstance(self.llm_int8_threshold, int | float):
raise ValueError("llm_int8_threshold must be a float or an int")

if not isinstance(self.bnb_4bit_quant_type, str):
Expand Down
6 changes: 3 additions & 3 deletions src/accelerate/utils/deepspeed.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def map_pytorch_optim_to_deepspeed(optimizer):
if is_bnb_available() and not is_adaw:
import bitsandbytes.optim as bnb_opt

if isinstance(optimizer, (bnb_opt.AdamW, bnb_opt.AdamW32bit)):
if isinstance(optimizer, bnb_opt.AdamW | bnb_opt.AdamW32bit):
try:
is_adaw = optimizer.optim_bits == 32
except AttributeError:
Expand All @@ -70,7 +70,7 @@ def map_pytorch_optim_to_deepspeed(optimizer):
if is_bnb_available() and not is_ada:
import bitsandbytes.optim as bnb_opt

if isinstance(optimizer, (bnb_opt.Adagrad, bnb_opt.Adagrad32bit)):
if isinstance(optimizer, bnb_opt.Adagrad | bnb_opt.Adagrad32bit):
try:
is_ada = optimizer.optim_bits == 32
except AttributeError:
Expand All @@ -84,7 +84,7 @@ def map_pytorch_optim_to_deepspeed(optimizer):
if is_bnb_available(min_version="0.38.0") and compare_versions("deepspeed", ">=", "0.11.0"):
from bitsandbytes.optim import Lion, Lion32bit

if isinstance(optimizer, (Lion, Lion32bit)):
if isinstance(optimizer, Lion | Lion32bit):
try:
is_bnb_32bits = optimizer.optim_bits == 32
except AttributeError:
Expand Down
6 changes: 3 additions & 3 deletions src/accelerate/utils/megatron_lm.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,11 @@ def set_megatron_data_args(self):
setattr(args, key, value)

def get_train_valid_test_datasets_provider(self, accelerator):
def train_valid_test_datasets_provider(train_val_test_num_samples):
def _default_train_valid_test_datasets_provider(train_val_test_num_samples):
"""Build train, valid, and test datasets."""
args = get_args()
dataset_args = {
"data_prefix": args.data_path if isinstance(args.data_path, (list, tuple)) else [args.data_path],
"data_prefix": args.data_path if isinstance(args.data_path, list | tuple) else [args.data_path],
"splits_string": args.split,
"train_valid_test_num_samples": train_val_test_num_samples,
"seed": args.seed,
Expand Down Expand Up @@ -257,7 +257,7 @@ def train_valid_test_datasets_provider(train_val_test_num_samples):
return train_valid_test_datasets_provider
except ImportError:
pass
return train_valid_test_datasets_provider
return _default_train_valid_test_datasets_provider

def build_train_valid_test_data_iterators(self, accelerator):
args = get_args()
Expand Down
4 changes: 2 additions & 2 deletions src/accelerate/utils/modeling.py
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ def get_balanced_memory(
# - the mean of the layer sizes
if no_split_module_classes is None:
no_split_module_classes = []
elif not isinstance(no_split_module_classes, (list, tuple)):
elif not isinstance(no_split_module_classes, list | tuple):
no_split_module_classes = [no_split_module_classes]

# Identify the size of the no_split_block modules
Expand Down Expand Up @@ -1089,7 +1089,7 @@ def _init_infer_auto_device_map(
max_memory = get_max_memory(max_memory)
if no_split_module_classes is None:
no_split_module_classes = []
elif not isinstance(no_split_module_classes, (list, tuple)):
elif not isinstance(no_split_module_classes, list | tuple):
no_split_module_classes = [no_split_module_classes]

devices = list(max_memory.keys())
Expand Down
16 changes: 8 additions & 8 deletions src/accelerate/utils/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def recursively_apply(func, data, *args, test_type=is_torch_tensor, error_on_oth
Returns:
The same data structure as `data` with `func` applied to every object of type `main_type`.
"""
if isinstance(data, (tuple, list)):
if isinstance(data, tuple | list):
return honor_type(
data,
(
Expand Down Expand Up @@ -166,7 +166,7 @@ def send_to_device(tensor, device, non_blocking=False, skip_keys=None):
return tensor.to(device, non_blocking=non_blocking)
except TypeError: # .to() doesn't accept non_blocking as kwarg
return tensor.to(device)
elif isinstance(tensor, (tuple, list)):
elif isinstance(tensor, tuple | list):
return honor_type(
tensor, (send_to_device(t, device, non_blocking=non_blocking, skip_keys=skip_keys) for t in tensor)
)
Expand Down Expand Up @@ -245,10 +245,10 @@ def find_batch_size(data):
Returns:
`int`: The batch size.
"""
if isinstance(data, (tuple, list, Mapping)) and (len(data) == 0):
if isinstance(data, tuple | list | Mapping) and (len(data) == 0):
raise ValueError(f"Cannot find the batch size from empty {type(data)}.")

if isinstance(data, (tuple, list)):
if isinstance(data, tuple | list):
return find_batch_size(data[0])
elif isinstance(data, Mapping):
for k in data.keys():
Expand Down Expand Up @@ -470,7 +470,7 @@ def _gpu_broadcast_one(tensor, src=0):


def _tpu_broadcast(tensor, src=0, name="broadcast tensor"):
if isinstance(tensor, (list, tuple)):
if isinstance(tensor, list | tuple):
return honor_type(tensor, (_tpu_broadcast(t, name=f"{name}_{i}") for i, t in enumerate(tensor)))
elif isinstance(tensor, Mapping):
return type(tensor)({k: _tpu_broadcast(v, name=f"{name}_{k}") for k, v in tensor.items()})
Expand Down Expand Up @@ -612,13 +612,13 @@ def concatenate(data, dim=0):
Returns:
The same data structure as `data` with all the tensors concatenated.
"""
if isinstance(data[0], (tuple, list)):
if isinstance(data[0], tuple | list):
return honor_type(data[0], (concatenate([d[i] for d in data], dim=dim) for i in range(len(data[0]))))
elif isinstance(data[0], Mapping):
return type(data[0])({k: concatenate([d[k] for d in data], dim=dim) for k in data[0].keys()})
elif isinstance(data[0], torch.Tensor):
return torch.cat(data, dim=dim)
elif isinstance(data, (tuple, list)) and len(data) == 1:
elif isinstance(data, tuple | list) and len(data) == 1:
return data[0]
else:
raise TypeError(f"Can only concatenate tensors but got {type(data[0])}")
Expand Down Expand Up @@ -840,7 +840,7 @@ def find_device(data):
device = find_device(obj)
if device is not None:
return device
elif isinstance(data, (tuple, list)):
elif isinstance(data, tuple | list):
for obj in data:
device = find_device(obj)
if device is not None:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ def test_swanlab(self):
if record is None:
continue
models_parser.parse_record(record)
header, project, experiment, logs, runtime, columns, scalars, medias, footer = models_parser.get_parsed()
header, project, experiment, logs, runtime, _columns, scalars, medias, _footer = models_parser.get_parsed()

# test file header
assert header.backup_type == "DEFAULT"
Expand Down