diff --git a/examples/complete_cv_example.py b/examples/complete_cv_example.py index 9fa9fceb853..b1899a8686d 100644 --- a/examples/complete_cv_example.py +++ b/examples/complete_cv_example.py @@ -92,7 +92,7 @@ def training_function(config, args): seed = int(config["seed"]) batch_size = int(config["batch_size"]) image_size = config["image_size"] - if not isinstance(image_size, (list, tuple)): + if not isinstance(image_size, list | tuple): image_size = (image_size, image_size) # Parse out whether we are saving every epoch or after a certain number of batches diff --git a/examples/cv_example.py b/examples/cv_example.py index f7e268ee661..5da2e08fc8e 100644 --- a/examples/cv_example.py +++ b/examples/cv_example.py @@ -81,7 +81,7 @@ def training_function(config, args): seed = int(config["seed"]) batch_size = int(config["batch_size"]) image_size = config["image_size"] - if not isinstance(image_size, (list, tuple)): + if not isinstance(image_size, list | tuple): image_size = (image_size, image_size) # Grab all the image filenames diff --git a/examples/inference/pippy/README.md b/examples/inference/pippy/README.md index 9bfa2741395..96683393bb3 100644 --- a/examples/inference/pippy/README.md +++ b/examples/inference/pippy/README.md @@ -1,6 +1,6 @@ -# Distributed inference examples with PiPPy +# Distributed inference examples with Pipeline Parallelism -This repo contains a variety of tutorials for using the [PiPPy](https://github.com/PyTorch/PiPPy) pipeline parallelism library with accelerate. You will find examples covering: +This repo contains a variety of tutorials for using PyTorch's built-in pipeline parallelism (`torch.distributed.pipelining`) with accelerate. You will find examples covering: 1. How to trace the model using `accelerate.prepare_pippy` 2. How to specify inputs based on what the model expects (when to use `kwargs`, `args`, and such) @@ -8,12 +8,14 @@ This repo contains a variety of tutorials for using the [PiPPy](https://github.c ## Installation -This requires the `main` branch of accelerate (or a version at least 0.27.0), `pippy` version of 0.2.0 or greater, and at least python 3.9. Please install using `pip install .` to pull from the `setup.py` in this repo, or run manually: +This requires the `main` branch of accelerate (or a version at least 0.27.0), PyTorch 2.4.0 or later (which includes `torch.distributed.pipelining`), and at least python 3.9. Please install using `pip install .` to pull from the `setup.py` in this repo, or run manually: ```bash -pip install 'accelerate>=0.27.0' 'torchpippy>=0.2.0' +pip install 'accelerate>=0.27.0' 'torch>=2.4.0' ``` +Note: The `torchpippy` package has been merged into PyTorch 2.4.0+ as `torch.distributed.pipelining`. If you're using PyTorch 2.4.0 or later, no additional package installation is needed. + ## Running code You can either use `torchrun` or the recommended way of `accelerate launch` (without needing to run `accelerate config`) on each script: diff --git a/examples/inference/pippy/t5.py b/examples/inference/pippy/t5.py index b134eb5372c..c70bfeccd15 100644 --- a/examples/inference/pippy/t5.py +++ b/examples/inference/pippy/t5.py @@ -24,8 +24,8 @@ if version.parse(accelerate_version) > version.parse("0.33.0"): raise RuntimeError( - "Using encoder/decoder models is not supported with the `torch.pipelining` integration or accelerate>=0.34.0. " - "Please use a lower accelerate version and `torchpippy`, which this example uses." + "Using encoder/decoder models is not supported with the `torch.distributed.pipelining` integration or accelerate>=0.34.0. " + "Please use a lower accelerate version. Note: `torchpippy` has been merged into PyTorch 2.4.0+ as `torch.distributed.pipelining`." ) diff --git a/setup.py b/setup.py index d4b498c5467..9ae0267a028 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,6 @@ "diffusers", "evaluate", "torchdata>=0.8.0", - "torchpippy>=0.2.0", "transformers", "scipy", "scikit-learn", diff --git a/src/accelerate/accelerator.py b/src/accelerate/accelerator.py index ce6b336bcce..bac051a0c0a 100755 --- a/src/accelerate/accelerator.py +++ b/src/accelerate/accelerator.py @@ -2184,9 +2184,9 @@ def _prepare_deepspeed(self, *args): for obj in result: if isinstance(obj, torch.nn.Module): model = obj - elif isinstance(obj, (torch.optim.Optimizer, DummyOptim)): + elif isinstance(obj, torch.optim.Optimizer | DummyOptim): optimizer = obj - elif (isinstance(obj, (LRScheduler, DummyScheduler))) or ( + elif (isinstance(obj, LRScheduler | DummyScheduler)) or ( type(obj).__name__ in deepspeed.runtime.lr_schedules.VALID_LR_SCHEDULES ): scheduler = obj @@ -2405,9 +2405,9 @@ def _prepare_deepspeed(self, *args): for i in range(len(result)): if isinstance(result[i], torch.nn.Module): result[i] = engine - elif isinstance(result[i], (torch.optim.Optimizer, DummyOptim)): + elif isinstance(result[i], torch.optim.Optimizer | DummyOptim): result[i] = optimizer - elif (isinstance(result[i], (LRScheduler, DummyScheduler))) or ( + elif (isinstance(result[i], LRScheduler | DummyScheduler)) or ( type(result[i]).__name__ in deepspeed.runtime.lr_schedules.VALID_LR_SCHEDULES ): result[i] = scheduler @@ -2488,7 +2488,7 @@ def _prepare_megatron_lm(self, *args): model = obj elif isinstance(obj, (torch.optim.Optimizer)): optimizer = obj - elif isinstance(obj, (LRScheduler, MegatronLMDummyScheduler)): + elif isinstance(obj, LRScheduler | MegatronLMDummyScheduler): scheduler = obj if model is not None: @@ -2757,7 +2757,7 @@ def prepare_optimizer(self, optimizer: torch.optim.Optimizer, device_placement=N from lomo_optim import AdaLomo, Lomo # Support multiple optimizers: https://github.com/huggingface/accelerate/pull/2695#discussion_r1589164607 - self.has_lomo_optimizer |= isinstance(optimizer, (Lomo, AdaLomo)) + self.has_lomo_optimizer |= isinstance(optimizer, Lomo | AdaLomo) # Ensure we can't double wrap an optimizer due to `find_batch_size` if getattr(optimizer, "_is_accelerate_prepared", False): @@ -2935,7 +2935,7 @@ def unscale_gradients(self, optimizer=None): if optimizer is None: # TODO: this unscales all optimizers where we should only unscale the one where parameters are. optimizer = self._optimizers - elif not isinstance(optimizer, (tuple, list)): + elif not isinstance(optimizer, tuple | list): optimizer = [optimizer] for opt in optimizer: while isinstance(opt, AcceleratedOptimizer): @@ -4324,7 +4324,7 @@ def lomo_backward(self, loss: torch.Tensor, learning_rate: float) -> None: _backward_called = False for optimizer in self._optimizers: - if isinstance(optimizer.optimizer, (Lomo, AdaLomo)): + if isinstance(optimizer.optimizer, Lomo | AdaLomo): optimizer.optimizer.fused_backward(loss, learning_rate) _backward_called = True diff --git a/src/accelerate/commands/estimate.py b/src/accelerate/commands/estimate.py index a1370e5a76c..d5d9250ee38 100644 --- a/src/accelerate/commands/estimate.py +++ b/src/accelerate/commands/estimate.py @@ -293,7 +293,7 @@ def estimate_command(args): data = gather_data(args) for row in data: for i, item in enumerate(row): - if isinstance(item, (int, float)): + if isinstance(item, int | float): row[i] = convert_bytes(item) elif isinstance(item, dict): training_usage = max(item.values()) diff --git a/src/accelerate/optimizer.py b/src/accelerate/optimizer.py index c1f8faa1543..b03b06a9964 100644 --- a/src/accelerate/optimizer.py +++ b/src/accelerate/optimizer.py @@ -26,7 +26,7 @@ def move_to_device(state, device): - if isinstance(state, (list, tuple)): + if isinstance(state, list | tuple): return honor_type(state, (move_to_device(t, device) for t in state)) elif isinstance(state, dict): return type(state)({k: move_to_device(v, device) for k, v in state.items()}) @@ -156,7 +156,7 @@ def step(self, closure=None): if is_lomo_available(): # `step` should be a no-op for LOMO optimizers. - if isinstance(self.optimizer, (Lomo, AdaLomo)): + if isinstance(self.optimizer, Lomo | AdaLomo): return if self.gradient_state.sync_gradients: diff --git a/src/accelerate/scheduler.py b/src/accelerate/scheduler.py index 1fa8a13f238..7eda4a5c307 100644 --- a/src/accelerate/scheduler.py +++ b/src/accelerate/scheduler.py @@ -46,7 +46,7 @@ class AcceleratedScheduler: def __init__(self, scheduler, optimizers, step_with_optimizer: bool = True, split_batches: bool = False): self.scheduler = scheduler - self.optimizers = optimizers if isinstance(optimizers, (list, tuple)) else [optimizers] + self.optimizers = optimizers if isinstance(optimizers, list | tuple) else [optimizers] self.split_batches = split_batches self.step_with_optimizer = step_with_optimizer self.gradient_state = GradientState() diff --git a/src/accelerate/state.py b/src/accelerate/state.py index 4da3c91f3c2..dfb0f86cee9 100644 --- a/src/accelerate/state.py +++ b/src/accelerate/state.py @@ -466,7 +466,7 @@ def split_between_processes(self, inputs: list | tuple | dict | torch.Tensor, ap end_index = start_index + num_samples_per_process + (1 if self.process_index < num_extras else 0) def _split_values(inputs, start_index, end_index): - if isinstance(inputs, (list, tuple, torch.Tensor)): + if isinstance(inputs, list | tuple | torch.Tensor): if start_index >= len(inputs): result = inputs[-1:] else: diff --git a/src/accelerate/test_utils/testing.py b/src/accelerate/test_utils/testing.py index 92b229ff8e4..3eada295a0a 100644 --- a/src/accelerate/test_utils/testing.py +++ b/src/accelerate/test_utils/testing.py @@ -519,8 +519,9 @@ def require_mlflow(test_case): def require_pippy(test_case): """ - Decorator marking a test that requires pippy installed. These tests are skipped when pippy isn't installed It is - also checked if the test is running on a Gaudi1 device which doesn't support pippy. + Decorator marking a test that requires PyTorch 2.4.0+ (which includes torch.distributed.pipelining). + These tests are skipped when PyTorch < 2.4.0. It is also checked if the test is running on a Gaudi1 device + which doesn't support pippy. """ return unittest.skipUnless(is_pippy_available() and not is_habana_gaudi1(), "test requires pippy")(test_case) @@ -687,7 +688,7 @@ def add_mocks(self, mocks: Union[mock.Mock, list[mock.Mock]]): mocks (`mock.Mock` or list of `mock.Mock`): Mocks that should be added to the `TestCase` after `TestCase.setUpClass` has been run """ - self.mocks = mocks if isinstance(mocks, (tuple, list)) else [mocks] + self.mocks = mocks if isinstance(mocks, tuple | list) else [mocks] for m in self.mocks: m.start() self.addCleanup(m.stop) diff --git a/src/accelerate/tracking.py b/src/accelerate/tracking.py index ecd722569fc..74188b51ffe 100644 --- a/src/accelerate/tracking.py +++ b/src/accelerate/tracking.py @@ -259,7 +259,7 @@ def log(self, values: dict, step: Optional[int] = None, **kwargs): """ values = listify(values) for k, v in values.items(): - if isinstance(v, (int, float)): + if isinstance(v, int | float): self.writer.add_scalar(k, v, global_step=step, **kwargs) elif isinstance(v, str): self.writer.add_text(k, v, global_step=step, **kwargs) @@ -582,7 +582,7 @@ def log(self, values: dict, step: Optional[int] = None, **kwargs): if step is not None: self.writer.set_step(step) for k, v in values.items(): - if isinstance(v, (int, float)): + if isinstance(v, int | float): self.writer.log_metric(k, v, step=step, **kwargs) elif isinstance(v, str): self.writer.log_other(k, v, **kwargs) @@ -834,7 +834,7 @@ def log(self, values: dict, step: Optional[int]): """ metrics = {} for k, v in values.items(): - if isinstance(v, (int, float)): + if isinstance(v, int | float): metrics[k] = v else: logger.warning_once( @@ -980,7 +980,7 @@ def log(self, values: dict[str, Union[int, float]], step: Optional[int] = None, """ clearml_logger = self.task.get_logger() for k, v in values.items(): - if not isinstance(v, (int, float)): + if not isinstance(v, int | float): logger.warning_once( "Accelerator is attempting to log a value of " f'"{v}" of type {type(v)} for key "{k}" as a scalar. ' @@ -1299,7 +1299,7 @@ def filter_trackers( """ loggers = [] if log_with is not None: - if not isinstance(log_with, (list, tuple)): + if not isinstance(log_with, list | tuple): log_with = [log_with] if "all" in log_with or LoggerType.ALL in log_with: loggers = [o for o in log_with if issubclass(type(o), GeneralTracker)] + get_available_trackers() diff --git a/src/accelerate/utils/dataclasses.py b/src/accelerate/utils/dataclasses.py index b226c25d312..3db78533f35 100644 --- a/src/accelerate/utils/dataclasses.py +++ b/src/accelerate/utils/dataclasses.py @@ -2906,7 +2906,7 @@ def __post_init__(self): if not self.load_in_4bit and not self.load_in_8bit: raise ValueError("load_in_4bit and load_in_8bit can't be both False") - if not isinstance(self.llm_int8_threshold, (int, float)): + if not isinstance(self.llm_int8_threshold, int | float): raise ValueError("llm_int8_threshold must be a float or an int") if not isinstance(self.bnb_4bit_quant_type, str): diff --git a/src/accelerate/utils/deepspeed.py b/src/accelerate/utils/deepspeed.py index 22db891c63d..a56dd3a9a08 100644 --- a/src/accelerate/utils/deepspeed.py +++ b/src/accelerate/utils/deepspeed.py @@ -50,7 +50,7 @@ def map_pytorch_optim_to_deepspeed(optimizer): if is_bnb_available() and not is_adaw: import bitsandbytes.optim as bnb_opt - if isinstance(optimizer, (bnb_opt.AdamW, bnb_opt.AdamW32bit)): + if isinstance(optimizer, bnb_opt.AdamW | bnb_opt.AdamW32bit): try: is_adaw = optimizer.optim_bits == 32 except AttributeError: @@ -70,7 +70,7 @@ def map_pytorch_optim_to_deepspeed(optimizer): if is_bnb_available() and not is_ada: import bitsandbytes.optim as bnb_opt - if isinstance(optimizer, (bnb_opt.Adagrad, bnb_opt.Adagrad32bit)): + if isinstance(optimizer, bnb_opt.Adagrad | bnb_opt.Adagrad32bit): try: is_ada = optimizer.optim_bits == 32 except AttributeError: @@ -84,7 +84,7 @@ def map_pytorch_optim_to_deepspeed(optimizer): if is_bnb_available(min_version="0.38.0") and compare_versions("deepspeed", ">=", "0.11.0"): from bitsandbytes.optim import Lion, Lion32bit - if isinstance(optimizer, (Lion, Lion32bit)): + if isinstance(optimizer, Lion | Lion32bit): try: is_bnb_32bits = optimizer.optim_bits == 32 except AttributeError: diff --git a/src/accelerate/utils/megatron_lm.py b/src/accelerate/utils/megatron_lm.py index e3754098801..6ec4aaf4dbb 100644 --- a/src/accelerate/utils/megatron_lm.py +++ b/src/accelerate/utils/megatron_lm.py @@ -200,11 +200,11 @@ def set_megatron_data_args(self): setattr(args, key, value) def get_train_valid_test_datasets_provider(self, accelerator): - def train_valid_test_datasets_provider(train_val_test_num_samples): + def _default_train_valid_test_datasets_provider(train_val_test_num_samples): """Build train, valid, and test datasets.""" args = get_args() dataset_args = { - "data_prefix": args.data_path if isinstance(args.data_path, (list, tuple)) else [args.data_path], + "data_prefix": args.data_path if isinstance(args.data_path, list | tuple) else [args.data_path], "splits_string": args.split, "train_valid_test_num_samples": train_val_test_num_samples, "seed": args.seed, @@ -257,7 +257,7 @@ def train_valid_test_datasets_provider(train_val_test_num_samples): return train_valid_test_datasets_provider except ImportError: pass - return train_valid_test_datasets_provider + return _default_train_valid_test_datasets_provider def build_train_valid_test_data_iterators(self, accelerator): args = get_args() diff --git a/src/accelerate/utils/modeling.py b/src/accelerate/utils/modeling.py index 830249ffb10..f1f36d30b9b 100644 --- a/src/accelerate/utils/modeling.py +++ b/src/accelerate/utils/modeling.py @@ -1000,7 +1000,7 @@ def get_balanced_memory( # - the mean of the layer sizes if no_split_module_classes is None: no_split_module_classes = [] - elif not isinstance(no_split_module_classes, (list, tuple)): + elif not isinstance(no_split_module_classes, list | tuple): no_split_module_classes = [no_split_module_classes] # Identify the size of the no_split_block modules @@ -1089,7 +1089,7 @@ def _init_infer_auto_device_map( max_memory = get_max_memory(max_memory) if no_split_module_classes is None: no_split_module_classes = [] - elif not isinstance(no_split_module_classes, (list, tuple)): + elif not isinstance(no_split_module_classes, list | tuple): no_split_module_classes = [no_split_module_classes] devices = list(max_memory.keys()) diff --git a/src/accelerate/utils/operations.py b/src/accelerate/utils/operations.py index ceec9b457fe..a3c5772d7c1 100644 --- a/src/accelerate/utils/operations.py +++ b/src/accelerate/utils/operations.py @@ -104,7 +104,7 @@ def recursively_apply(func, data, *args, test_type=is_torch_tensor, error_on_oth Returns: The same data structure as `data` with `func` applied to every object of type `main_type`. """ - if isinstance(data, (tuple, list)): + if isinstance(data, tuple | list): return honor_type( data, ( @@ -166,7 +166,7 @@ def send_to_device(tensor, device, non_blocking=False, skip_keys=None): return tensor.to(device, non_blocking=non_blocking) except TypeError: # .to() doesn't accept non_blocking as kwarg return tensor.to(device) - elif isinstance(tensor, (tuple, list)): + elif isinstance(tensor, tuple | list): return honor_type( tensor, (send_to_device(t, device, non_blocking=non_blocking, skip_keys=skip_keys) for t in tensor) ) @@ -245,10 +245,10 @@ def find_batch_size(data): Returns: `int`: The batch size. """ - if isinstance(data, (tuple, list, Mapping)) and (len(data) == 0): + if isinstance(data, tuple | list | Mapping) and (len(data) == 0): raise ValueError(f"Cannot find the batch size from empty {type(data)}.") - if isinstance(data, (tuple, list)): + if isinstance(data, tuple | list): return find_batch_size(data[0]) elif isinstance(data, Mapping): for k in data.keys(): @@ -470,7 +470,7 @@ def _gpu_broadcast_one(tensor, src=0): def _tpu_broadcast(tensor, src=0, name="broadcast tensor"): - if isinstance(tensor, (list, tuple)): + if isinstance(tensor, list | tuple): return honor_type(tensor, (_tpu_broadcast(t, name=f"{name}_{i}") for i, t in enumerate(tensor))) elif isinstance(tensor, Mapping): return type(tensor)({k: _tpu_broadcast(v, name=f"{name}_{k}") for k, v in tensor.items()}) @@ -612,13 +612,13 @@ def concatenate(data, dim=0): Returns: The same data structure as `data` with all the tensors concatenated. """ - if isinstance(data[0], (tuple, list)): + if isinstance(data[0], tuple | list): return honor_type(data[0], (concatenate([d[i] for d in data], dim=dim) for i in range(len(data[0])))) elif isinstance(data[0], Mapping): return type(data[0])({k: concatenate([d[k] for d in data], dim=dim) for k in data[0].keys()}) elif isinstance(data[0], torch.Tensor): return torch.cat(data, dim=dim) - elif isinstance(data, (tuple, list)) and len(data) == 1: + elif isinstance(data, tuple | list) and len(data) == 1: return data[0] else: raise TypeError(f"Can only concatenate tensors but got {type(data[0])}") @@ -840,7 +840,7 @@ def find_device(data): device = find_device(obj) if device is not None: return device - elif isinstance(data, (tuple, list)): + elif isinstance(data, tuple | list): for obj in data: device = find_device(obj) if device is not None: diff --git a/tests/test_tracking.py b/tests/test_tracking.py index 4fee94a61f8..7406afa7c40 100644 --- a/tests/test_tracking.py +++ b/tests/test_tracking.py @@ -594,7 +594,7 @@ def test_swanlab(self): if record is None: continue models_parser.parse_record(record) - header, project, experiment, logs, runtime, columns, scalars, medias, footer = models_parser.get_parsed() + header, project, experiment, logs, runtime, _columns, scalars, medias, _footer = models_parser.get_parsed() # test file header assert header.backup_type == "DEFAULT"