Skip to content

Commit

Permalink
yo
Browse files Browse the repository at this point in the history
  • Loading branch information
snarayan21 committed Dec 3, 2024
1 parent d10d442 commit b29d4ed
Show file tree
Hide file tree
Showing 32 changed files with 176 additions and 1,043 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ The heart of Composer is our Trainer abstraction: a highly optimized PyTorch tra

Whether you’re training on 1 GPU or 512 GPUs, 50MB or 10TB of data - Composer is built to keep your workflow simple.

- [**FSDP**](https://docs.mosaicml.com/projects/composer/en/stable/notes/distributed_training.html#fullyshardeddataparallel-fsdp): For large models that are too large to fit on GPUs, Composer has integrated PyTorch [FullyShardedDataParallelism](https://docs.mosaicml.com/projects/composer/en/stable/notes/distributed_training.html#fullyshardeddataparallel-fsdp) into our trainer and made it simple to efficiently parallelize custom models. We’ve found FSDP is competitive performance-wise with much more complex parallelism strategies. Alternatively, Composer also supports standard PyTorch distributed data parallelism (DDP) and Deepspeed execution.
- [**FSDP**](https://docs.mosaicml.com/projects/composer/en/stable/notes/distributed_training.html#fullyshardeddataparallel-fsdp): For large models that are too large to fit on GPUs, Composer has integrated PyTorch [FullyShardedDataParallelism](https://docs.mosaicml.com/projects/composer/en/stable/notes/distributed_training.html#fullyshardeddataparallel-fsdp) into our trainer and made it simple to efficiently parallelize custom models. We’ve found FSDP is competitive performance-wise with much more complex parallelism strategies. Alternatively, Composer also supports standard PyTorch distributed data parallelism (DDP) execution.
- [**Elastic sharded checkpointing**](https://docs.mosaicml.com/projects/composer/en/stable/notes/distributed_training.html#saving-and-loading-sharded-checkpoints-with-fsdp): Save on eight GPUs, resume on sixteen. Composer supports elastic sharded checkpointing, so you never have to worry if your sharded saved state is compatible with your new hardware setup.
- **Data streaming:** Working with large datasets? Download datasets from cloud blob storage on the fly by integrating with MosaicML [StreamingDataset](https://github.com/mosaicml/streaming) during model training.

Expand Down
27 changes: 7 additions & 20 deletions STYLE_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ As a general rule of thumb,
```python
from typing import Optional

def configure_deepspeed(deepspeed_config: Optional[dict]):
if deepspeed_config is None:
def configure_parallelism(parallelism_config: Optional[dict]):
if parallelism_config is None:
# Don't do this check in the callee, which results in a no-op
return
...
Expand All @@ -67,13 +67,13 @@ As a general rule of thumb,
```python
from typing import Optional

def configure_deepspeed(deepspeed_config: dict):
def configure_parallelism(parallelism_config: dict):
...

def trainer(deepspeed_config: Optional[dict]):
if deepspeed_config is not None:
def trainer(paralellism_config: Optional[dict]):
if paralellism_config is not None:
# Do this check in the caller function
configure_deepspeed(deepspeed_config)
configure_paralellism(paralellism_config)
...
```

Expand Down Expand Up @@ -251,20 +251,7 @@ All imports in composer should be absolute -- that is, they do not begin with a
an optional dependency is missing.
If the corresponding package is not published on Anaconda, then set the ``conda_package`` to the pip package
name, and set ``conda_channel`` to ``None``. For example, with DeepSpeed:
<!--pytest-codeblocks:importorskip(deepspeed)-->
```python
from composer.utils import MissingConditionalImportError
try:
import deepspeed
except ImportError as e:
raise MissingConditionalImportError(extra_deps_group="deepspeed",
conda_package="deepspeed>=0.5.5",
conda_channel=None) from e
```
name, and set ``conda_channel`` to ``None``.
1. If the dependency is core to Composer, add the dependency to the `install_requires` section of
Expand Down
19 changes: 1 addition & 18 deletions composer/algorithms/gradient_clipping/gradient_clipping.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,6 @@ class GradientClipping(Algorithm):
to (for 'value'), what values to clip the gradient norms to (for 'norm'), and
threshold by which if grad_norm / weight_norm is greater than this threshold then
scale gradients by this threshold * (weight_norm / grad_norm) (for 'adaptive').
Raises:
NotImplementedError: if deepspeed is enabled and clipping_type is not 'norm'.
ValueError: if deepspeed is enabled and clipping_type is not 'norm'.
"""

def __init__(self, clipping_type: str, clipping_threshold: float):
Expand All @@ -136,20 +132,7 @@ def match(self, event: Event, state: State) -> bool:
return event in [Event.INIT, Event.AFTER_TRAIN_BATCH]

def apply(self, event: Event, state: State, logger: Logger) -> Optional[int]:
if event == Event.INIT and state.deepspeed_config is not None:
if self.clipping_type == 'norm':
if self.clipping_threshold > 0:
state.deepspeed_config['gradient_clipping'] = self.clipping_threshold
else:
raise ValueError(
f'Deepspeed only supports gradient clipping thresholds that are greater than zero, but the provided one is {self.clipping_threshold}',
)
else:
raise NotImplementedError(
f"Deepspeed only supports gradient clipping of type 'norm' not of type '{self.clipping_type}'",
)

if event == Event.AFTER_TRAIN_BATCH and not state.deepspeed_enabled:
if event == Event.AFTER_TRAIN_BATCH:
apply_gradient_clipping(
model=state.model,
clipping_type=self.clipping_type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class LowPrecisionGroupNorm(Algorithm):
LPGroupNorm is a thin wrapper around :class:`torch.nn.GroupNorm` which forces the layer to run
in lower precision (torch.float16 or torch.bfloat16) if autocast is enabled. This algorithm has
no effect in FP32 or DeepSpeed FP16 mode, where autocast is disabled.
no effect in FP32, where autocast is disabled.
This algorithm is intended to be used instead of Fused GroupNorm. They have similar behavior and performance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class LowPrecisionLayerNorm(Algorithm):
LPLayerNorm is a thin wrapper around :class:`torch.nn.LayerNorm` which forces the layer to run
in lower precision (torch.float16 or torch.bfloat16) if autocast is enabled. This algorithm has
no effect in FP32 or DeepSpeed FP16 mode, where autocast is disabled.
no effect in FP32, where autocast is disabled.
This algorithm is intended to be used instead of Fused LayerNorm. They have similar behavior and performance.
Expand Down
60 changes: 11 additions & 49 deletions composer/callbacks/checkpoint_saver.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
ensure_folder_has_no_conflicting_files,
format_name_with_dist,
format_name_with_dist_and_time,
is_model_deepspeed,
parse_uri,
partial_format,
)
Expand Down Expand Up @@ -99,19 +98,12 @@ class CheckpointSaver(Callback): # noqa: D101
* By default, only the rank zero process will save a checkpoint file.
* When using DeepSpeed, each rank will save a checkpoint file in tarball format. DeepSpeed
requires tarball format, as it saves model and optimizer states in separate files.
Ensure that ``'{{rank}}'`` appears within the ``filename``. Otherwise, multiple ranks
may attempt to write to the same file(s), leading to corrupted checkpoints. If no tarball file
extension is specified, ``'.tar'`` will be used.
* To write to compressed tar files (regardless of whether DeepSpeed is enabled), set the file
* To write to compressed tar files, set the file
extension to ``'.tar.gz'``, ``'.tgz'``, ``'.tar.bz2'``, or ``'.tar.lzma'`` (depending on the
desired compression algorithm).
* To write to compressed pt files (when DeepSpeed is disabled), set the file extension to
``'.pt.bz2'``, ``'.pt.gz'``, ``'.pt.lz4'``, ``'.pt.lzma'``, ``'.pt.lzo'``, ``'.pt.xz'``,
``'.pt.zst'``
* To write to compressed pt files, set the file extension to ``'.pt.bz2'``, ``'.pt.gz'``,
``'.pt.lz4'``, ``'.pt.lzma'``, ``'.pt.lzo'``, ``'.pt.xz'``, ``'.pt.zst'``
(depending on the desired algorithm). You must have the corresponding CLI tool installed.
``lz4`` is a good choice for a modest space saving while being very fast to compress.
Expand All @@ -133,14 +125,8 @@ class CheckpointSaver(Callback): # noqa: D101
* The current epoch count is ``1``.
* The current batch count is ``42``.
When DeepSpeed is not being used, the rank zero process will save the checkpoint to
The rank zero process will save the checkpoint to
``"awesome-training-run/checkpoints/ep1-ba42-rank0"``.
When DeepSpeed is being used, each rank (process) will save checkpoints to::
awesome-training-run/checkpoints/ep1-ba42-rank0.tar
awesome-training-run/checkpoints/ep1-ba42-rank1.tar
awesome-training-run/checkpoints/ep1-ba42-rank2.tar
...
remote_file_name (str, optional): Format string for the checkpoint's remote file name.
Expand Down Expand Up @@ -174,16 +160,10 @@ class CheckpointSaver(Callback): # noqa: D101
* The current epoch count is ``1``.
* The current batch count is ``42``.
When DeepSpeed is not being used, the rank zero process will save the checkpoint to
The rank zero process will save the checkpoint to
``'awesome-training-run/checkpoints/ep1-ba42-rank0'``,
and a symlink will be created at
``'awesome-training-run/checkpoints/latest-rank0' -> 'awesome-training-run/checkpoints/ep1-ba42-rank0'``
When DeepSpeed is being used, each rank (process) will save checkpoints to::
awesome-training-run/checkpoints/ep1-ba42-rank0.tar
awesome-training-run/checkpoints/ep1-ba42-rank1.tar
awesome-training-run/checkpoints/ep1-ba42-rank2.tar
...
Corresponding symlinks will be created at::
Expand Down Expand Up @@ -236,7 +216,7 @@ class CheckpointSaver(Callback): # noqa: D101
remote file systems.
weights_only (bool): If ``True``, save only the model weights instead of the entire training state.
This parameter must be ``False`` when using DeepSpeed. Default: ``False``.
Default: ``False``.
ignore_keys (list[str] | (dict) -> None, optional): A list of paths for the ``state_dict`` of the checkpoint,
which, when provided, will be ignored from the state_dict before a checkpoint is saved. Each path is a list
Expand Down Expand Up @@ -269,10 +249,7 @@ class CheckpointSaver(Callback): # noqa: D101
.. note::
When using DeepSpeed, the index of a filepath in each list corresponds to the global rank of
the process that wrote that file. Each filepath is valid only on the process's (rank's) node.
Otherwise, when not using DeepSpeed, each sub-list will contain only one filepath since only rank zero
Each sub-list will contain only one filepath since only rank zero
saves checkpoints.
"""

Expand Down Expand Up @@ -393,9 +370,6 @@ def fit_start(self, state: State, logger: Logger) -> None:

dist.barrier() # holds all ranks until folder check is done

if is_model_deepspeed(state.model) and self.weights_only:
raise NotImplementedError('weights_only=True is not supported when using DeepSpeed.')

self.start_batch = state.timestamp.batch

def batch_checkpoint(self, state: State, logger: Logger):
Expand Down Expand Up @@ -466,13 +440,8 @@ def _upload_checkpoint(
def _save_checkpoint(self, state: State, logger: Logger):
self.last_checkpoint_batch = state.timestamp.batch

is_deepspeed = is_model_deepspeed(state.model)

if is_deepspeed and '{rank}' not in self.filename.filename:
raise ValueError(f'Save filename {self.filename.filename} must have {{rank}} for deepspeed.')

# save the checkpoint to the filename
filename_with_placeholders = self.filename.format(state, is_deepspeed, keep_placeholders=True)
filename_with_placeholders = self.filename.format(state, keep_placeholders=True)
save_filename = checkpoint.get_save_filename(state, filename_with_placeholders)
# Store before saving so state_dict in checkpoint has reference to latest checkpoint (itself)
self.all_saved_checkpoints_to_timestamp[save_filename] = state.timestamp
Expand Down Expand Up @@ -505,7 +474,7 @@ def _save_checkpoint(self, state: State, logger: Logger):

self.rank_saves_symlinks = dist.get_global_rank() == 0 or not state.fsdp_sharded_state_dict_enabled
if self.latest_filename is not None and self.num_checkpoints_to_keep != 0:
symlink = self.latest_filename.format(state, is_deepspeed)
symlink = self.latest_filename.format(state)
os.makedirs(os.path.dirname(symlink), exist_ok=True)
try:
os.remove(symlink)
Expand All @@ -524,7 +493,6 @@ def _save_checkpoint(self, state: State, logger: Logger):
if state.fsdp_sharded_state_dict_enabled:
remote_file_name = self.remote_file_name.format(
state,
is_deepspeed,
keep_placeholders=True,
).lstrip('/')
assert state.fsdp_config is not None
Expand All @@ -549,10 +517,7 @@ def _save_checkpoint(self, state: State, logger: Logger):
logger=logger,
)
else:
remote_file_name = self.remote_file_name.format(
state,
is_deepspeed,
).lstrip('/')
remote_file_name = self.remote_file_name.format(state,).lstrip('/')

log.debug(f'Uploading checkpoint to {remote_file_name}')
try:
Expand All @@ -572,10 +537,7 @@ def _save_checkpoint(self, state: State, logger: Logger):

# symlinks stay the same with sharded checkpointing
if self.latest_remote_file_name is not None:
symlink_name = self.latest_remote_file_name.format(
state,
is_deepspeed,
).lstrip('/') + '.symlink'
symlink_name = self.latest_remote_file_name.format(state,).lstrip('/') + '.symlink'

# create and upload a symlink file
symlink_filename = os.path.join(
Expand Down
4 changes: 0 additions & 4 deletions composer/core/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ def find_unused_parameters(self) -> bool:
For example, it is used to tell :class:`torch.nn.parallel.DistributedDataParallel` (DDP) that some parameters
will be frozen during training, and hence it should not expect gradients from them. All algorithms which do any
kind of parameter freezing should override this function to return ``True``.
.. note::
DeepSpeed integration with this function returning True is not tested. It may not work as expected.
"""
return False

Expand Down
23 changes: 2 additions & 21 deletions composer/core/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,10 @@
dist,
ensure_tuple,
get_composer_env_dict,
is_model_deepspeed,
reproducibility,
)

if TYPE_CHECKING:
import deepspeed

from composer.core.algorithm import Algorithm
from composer.core.callback import Callback
from composer.core.evaluator import Evaluator
Expand Down Expand Up @@ -330,7 +327,6 @@ class State(Serializable):
save_metrics (bool, optional): Whether to save metrics in state_dict.
algorithms (Algorithm | Sequence[Algorithm], optional): The algorithms used for training.
callbacks (Callback | Sequence[Callback], optional): The callbacks used for training.
deepspeed_config (dict[str, Any], optional): The configuration dictionary for deepspeed.
parallelism_config (ParallelismConfig, optional): The configuration dictionary for parallelism.
Attributes:
Expand Down Expand Up @@ -397,9 +393,8 @@ class State(Serializable):
.. note::
When using DeepSpeed or multi-rank training, the model will be wrapped with
:class:`~deepspeed.DeepSpeedEngine` or :class:`~torch.nn.parallel.DistributedDataParallel`,
respectively.
When using multi-rank training with DDP, the model will be wrapped with
:class:`~torch.nn.parallel.DistributedDataParallel`.
outputs (torch.Tensor | Sequence[torch.Tensor]): The most recently computed output from the model's forward
pass.
Expand Down Expand Up @@ -497,7 +492,6 @@ def __init__(
callbacks: Optional[Union[Callback, Sequence[Callback]]] = None,

# Distributed training configs
deepspeed_config: Optional[dict[str, Any]] = None,
parallelism_config: Optional[ParallelismConfig] = None,
):
self.rank_zero_seed = rank_zero_seed
Expand Down Expand Up @@ -542,7 +536,6 @@ def __init__(

self.profiler: Optional[Profiler] = None

self.deepspeed_config = deepspeed_config
self.fsdp_config = parallelism_config.fsdp if parallelism_config is not None else None
self.tp_config = parallelism_config.tp if parallelism_config is not None else None

Expand Down Expand Up @@ -880,11 +873,6 @@ def evaluators(self):
def evaluators(self, evaluators: Union[Evaluator, Sequence[Evaluator]]):
self._evaluators[:] = list(ensure_tuple(evaluators))

@property
def deepspeed_enabled(self):
"""Indicates if deepspeed is enabled."""
return self.deepspeed_config is not None

@property
def fsdp_enabled(self):
"""Indicates if FSDP is enabled."""
Expand Down Expand Up @@ -1765,10 +1753,3 @@ def precision_config(self):
def is_model_ddp(self):
"""Whether :attr:`model` is an instance of a :class:`.DistributedDataParallel`."""
return isinstance(self.model, DistributedDataParallel)

@property
def deepspeed_model(self) -> deepspeed.DeepSpeedEngine:
"""Cast :attr:`model` to :class:`~deepspeed.DeepSpeedEngine`."""
if is_model_deepspeed(self.model):
return cast('deepspeed.DeepSpeedEngine', self.model)
raise TypeError('state.model is not a DeepSpeed model')
3 changes: 0 additions & 3 deletions composer/distributed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

"""Distributed training."""

from composer.distributed.deepspeed import fix_batch_precision_for_deepspeed, parse_deepspeed_config
from composer.distributed.dist_strategy import (
DDPSyncStrategy,
ddp_sync_context,
Expand All @@ -13,8 +12,6 @@
)

__all__ = [
'fix_batch_precision_for_deepspeed',
'parse_deepspeed_config',
'DDPSyncStrategy',
'ddp_sync_context',
'prepare_ddp_module',
Expand Down
Loading

0 comments on commit b29d4ed

Please sign in to comment.