Skip to content

Commit

Permalink
Speed up CI tests :) (#3727)
Browse files Browse the repository at this point in the history
Co-authored-by: v-chen_data <[email protected]>
  • Loading branch information
KuuCi and v-chen_data authored Dec 2, 2024
1 parent b25dd2b commit d10d442
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 136 deletions.
176 changes: 92 additions & 84 deletions tests/algorithms/test_algorithm_resumption.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,94 +28,102 @@ def test_algorithm_resumption(
alg_cls: type[Algorithm],
world_size,
):
folder1 = os.path.join(tmp_path, 'folder1')
folder2 = os.path.join(tmp_path, 'folder2')
os.makedirs(folder1, exist_ok=True)
os.makedirs(folder2, exist_ok=True)

model = get_alg_model(alg_cls)
alg_kwargs = get_alg_kwargs(alg_cls)

copied_model = copy.deepcopy(model) # copy the model so the params will start from the same point

if alg_cls is LayerFreezing:
pytest.xfail('Known issues')

if alg_cls in (SAM, StochasticDepth):
pytest.xfail('Mismatch in weights when resuming from a checkpoint.')

if alg_cls is GyroDropout:
pytest.xfail('GyroDropoutLayer is not implemented in a way that allows correct resumption.')

if alg_cls is SWA and world_size > 1:
pytest.xfail('SWA is not implemented in a way that is compatible correct resumption on multiple devices.')

optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=5)

shared_config = {
'max_duration': '2ep',
'save_filename': 'ep{epoch}-rank{rank}',
'save_interval': '1ep',
'train_subset_num_batches': 2,
'precision': 'amp_bf16',
}
train_dataloader = get_alg_dataloader(alg_cls) if world_size == 1 else get_alg_dataloader(alg_cls, multigpu=True)
# train model once, saving checkpoints every epoch
trainer1 = Trainer(
model=model,
train_dataloader=train_dataloader,
optimizers=optimizer,
schedulers=scheduler,
save_folder=folder1,
algorithms=alg_cls(**alg_kwargs),
**shared_config,
)
trainer1.fit()

# create second trainer, load an intermediate checkpoint
# and continue training

optimizer = torch.optim.Adam(copied_model.parameters(), lr=0.01)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=5)

alg = alg_cls(**alg_kwargs)
# SeqLengthWarmup has a call to ._activate_model() that happens on the first call to the algorithm
# in order to get complete matching of the rng state, we have to cause that extra call to be skipped
# when reloading.
if alg_cls is SeqLengthWarmup:
alg._activated = True # type: ignore

train_dataloader = get_alg_dataloader(alg_cls) if world_size == 1 else get_alg_dataloader(alg_cls, multigpu=True)
trainer2 = Trainer(
model=copied_model,
train_dataloader=train_dataloader,
load_path=os.path.join(folder1, 'ep1-rank{rank}'),
load_weights_only=False,
load_strict_model_weights=False,
optimizers=optimizer,
schedulers=scheduler,
save_folder=folder2,
algorithms=alg,
**shared_config,
)
trainer2.fit()
# check that the checkpoints are equal
if world_size == 1 or dist.get_global_rank() == 0:
_assert_checkpoints_equal(
file1=os.path.join(folder1, 'ep2-rank0'),
file2=os.path.join(folder2, 'ep2-rank0'),
# Use RAM-based tmp directory instead of disk
from tempfile import TemporaryDirectory
with TemporaryDirectory() as tmpdir:
folder1 = os.path.join(tmpdir, 'folder1')
folder2 = os.path.join(tmpdir, 'folder2')
os.makedirs(folder1, exist_ok=True)
os.makedirs(folder2, exist_ok=True)

if alg_cls is LayerFreezing:
pytest.xfail('Known issues')

if alg_cls in (SAM, StochasticDepth):
pytest.xfail('Mismatch in weights when resuming from a checkpoint.')

if alg_cls is GyroDropout:
pytest.xfail('GyroDropoutLayer is not implemented in a way that allows correct resumption.')

if alg_cls is SWA and world_size > 1:
pytest.xfail('SWA is not implemented in a way that is compatible correct resumption on multiple devices.')

model = get_alg_model(alg_cls)
alg_kwargs = get_alg_kwargs(alg_cls)

copied_model = copy.deepcopy(model) # copy the model so the params will start from the same point

optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1)

# Reduce training duration and data
shared_config = {
'max_duration': '2ba',
'save_filename': 'checkpoint_ba{batch}-rank{rank}',
'save_interval': '1ba',
'train_subset_num_batches': 2,
'precision': 'amp_bf16',
}
train_dataloader = get_alg_dataloader(
alg_cls,
) if world_size == 1 else get_alg_dataloader(alg_cls, multigpu=True)
# train model once, saving checkpoints every epoch
trainer1 = Trainer(
model=model,
train_dataloader=train_dataloader,
optimizers=optimizer,
schedulers=scheduler,
save_folder=folder1,
algorithms=alg_cls(**alg_kwargs),
**shared_config,
)
trainer1.fit()

# create second trainer, load an intermediate checkpoint
# and continue training

optimizer = torch.optim.SGD(copied_model.parameters(), lr=0.1)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1)

alg = alg_cls(**alg_kwargs)
# SeqLengthWarmup has a call to ._activate_model() that happens on the first call to the algorithm
# in order to get complete matching of the rng state, we have to cause that extra call to be skipped
# when reloading.
if alg_cls is SeqLengthWarmup:
alg._activated = True # type: ignore
train_dataloader = get_alg_dataloader(
alg_cls,
) if world_size == 1 else get_alg_dataloader(alg_cls, multigpu=True)

trainer2 = Trainer(
model=copied_model,
train_dataloader=train_dataloader,
load_path=os.path.join(folder1, 'checkpoint_ba1-rank{rank}'),
load_weights_only=False,
load_strict_model_weights=False,
optimizers=optimizer,
schedulers=scheduler,
save_folder=folder2,
algorithms=alg,
**shared_config,
)
trainer2.fit()

# check that different epoch checkpoints are _not_ equal
# this ensures that the model weights are being updated.
if world_size == 1 or dist.get_global_rank() == 0:
with pytest.raises(AssertionError):
_assert_model_weights_equal(
file1=os.path.join(folder1, 'ep1-rank0'),
file2=os.path.join(folder1, 'ep2-rank0'),
# check that the checkpoints are equal
if world_size == 1 or dist.get_global_rank() == 0:
_assert_checkpoints_equal(
os.path.join(folder1, 'checkpoint_ba2-rank0'),
os.path.join(folder2, 'checkpoint_ba2-rank0'),
)

# check that different epoch checkpoints are _not_ equal
# this ensures that the model weights are being updated.
with pytest.raises(AssertionError):
_assert_model_weights_equal(
os.path.join(folder1, 'checkpoint_ba1-rank0'),
os.path.join(folder1, 'checkpoint_ba2-rank0'),
)


def _assert_checkpoints_equal(file1, file2):
# TODO: consider merging with _assert_checkpoints_equivalent
Expand Down
6 changes: 3 additions & 3 deletions tests/algorithms/test_algorithms_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def test_algorithm_trains(alg_cls: type[Algorithm]):
trainer = Trainer(
model=model,
train_dataloader=dataloader,
max_duration='2ep',
max_duration='2ba',
algorithms=alg_cls(**alg_kwargs),
)
trainer.fit()
Expand All @@ -34,5 +34,5 @@ def test_algorithm_trains(alg_cls: type[Algorithm]):
'GyroDropout is implemented to be applied on Event.FIT_START, so is not compatible with multiple calls to fit.',
)

# fit again for another epoch
trainer.fit(duration='1ep')
# fit again for another batch
trainer.fit(duration='1ba')
8 changes: 3 additions & 5 deletions tests/algorithms/test_gradient_clipping.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
def simple_model_with_grads():
# Set up small NN with one linear layer with no bias + softmax, so only
# one set of params and get some gradients.
N, hin, num_classes = 8, 4, 3
N, hin, num_classes = 4, 2, 2
x = torch.rand((N, hin))
y = torch.randint(high=num_classes - 1, size=(N,))
model = nn.Sequential(nn.Linear(hin, num_classes, bias=False), nn.Softmax(dim=1))
Expand All @@ -47,8 +47,6 @@ def __init__(self, n_ch, num_fmaps, h, num_classes, filter_size):
self.mlp = nn.Sequential(
nn.Linear(num_fmaps, h),
nn.ReLU(),
nn.Linear(h, h),
nn.ReLU(),
nn.Linear(h, num_classes),
nn.Softmax(dim=1),
)
Expand All @@ -60,8 +58,8 @@ def forward(self, x):
return out

# Generate some gradients.
N, n_ch, num_fmaps, h, num_classes, filter_size = 8, 3, 4, 4, 3, 3
x = torch.rand((N, n_ch, 16, 16))
N, n_ch, num_fmaps, h, num_classes, filter_size = 4, 1, 2, 2, 2, 2
x = torch.rand((N, n_ch, 8, 8))
y = torch.randint(high=num_classes - 1, size=(N,))
model = myNN(n_ch, num_fmaps, h, num_classes, filter_size)

Expand Down
62 changes: 51 additions & 11 deletions tests/test_events.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Copyright 2022 MosaicML Composer authors
# Copyright 2024 MosaicML Composer authors
# SPDX-License-Identifier: Apache-2.0

import math
from unittest.mock import patch

import pytest
import torch
Expand All @@ -22,27 +23,31 @@ def test_event_values(event: Event):

class TestEventCalls:

eval_subset_num_batches = 2
train_subset_num_batches = 2
eval_subset_num_batches = 1
train_subset_num_batches = 1

def get_trainer(self, precision='fp32', **kwargs):
def get_trainer(self, precision='fp32', max_duration='1ep', save_interval='1ep', **kwargs):
model = SimpleModel()
optimizer = torch.optim.Adam(model.parameters())

train_dataset = RandomClassificationDataset()
eval_dataset = RandomClassificationDataset()
train_dataset = RandomClassificationDataset(size=16)
eval_dataset = RandomClassificationDataset(size=16)
train_batch_size = 4

evaluator1 = DataLoader(
dataset=eval_dataset,
batch_size=8,
sampler=dist.get_sampler(eval_dataset),
num_workers=0,
drop_last=True,
)

evaluator2 = DataLoader(
dataset=eval_dataset,
batch_size=4,
sampler=dist.get_sampler(eval_dataset),
num_workers=0,
drop_last=True,
)

return Trainer(
Expand All @@ -51,13 +56,15 @@ def get_trainer(self, precision='fp32', **kwargs):
dataset=train_dataset,
batch_size=train_batch_size,
sampler=dist.get_sampler(train_dataset),
num_workers=0,
),
eval_dataloader=(evaluator1, evaluator2),
device_train_microbatch_size=train_batch_size // 2,
precision=precision,
train_subset_num_batches=self.train_subset_num_batches,
eval_subset_num_batches=self.eval_subset_num_batches,
max_duration='2ep',
max_duration=max_duration,
save_interval=save_interval,
optimizers=optimizer,
callbacks=[EventCounterCallback()],
**kwargs,
Expand Down Expand Up @@ -101,8 +108,41 @@ def get_trainer(self, precision='fp32', **kwargs):
)
@pytest.mark.parametrize('save_interval', ['1ep', '1ba'])
def test_event_calls(self, world_size, device, deepspeed_zero_stage, use_fsdp, precision, save_interval):
save_interval = Time.from_timestring(save_interval)

# handle 1ba save interval separately to optimize speed
if save_interval == '1ba':
# mock the save_checkpoint method to speed up batch saves
with patch('composer.trainer.trainer.Trainer.save_checkpoint') as mock_save:
mock_save.return_value = None
self._run_event_calls_test(
world_size,
device,
deepspeed_zero_stage,
use_fsdp,
precision,
save_interval,
num_epochs=1,
)
else:
self._run_event_calls_test(
world_size,
device,
deepspeed_zero_stage,
use_fsdp,
precision,
save_interval,
num_epochs=1,
)

def _run_event_calls_test(
self,
world_size,
device,
deepspeed_zero_stage,
use_fsdp,
precision,
save_interval,
num_epochs,
):
deepspeed_config = None
if deepspeed_zero_stage:
deepspeed_config = {'zero_optimization': {'stage': deepspeed_zero_stage}}
Expand All @@ -123,11 +163,11 @@ def test_event_calls(self, world_size, device, deepspeed_zero_stage, use_fsdp, p
deepspeed_config=deepspeed_config,
parallelism_config=parallelism_config,
save_interval=save_interval,
eval_interval=save_interval,
eval_interval=Time.from_timestring(save_interval),
)
trainer.fit()

self._assert_expected_event_calls(trainer, save_interval, num_epochs=2)
self._assert_expected_event_calls(trainer, Time.from_timestring(save_interval), num_epochs=num_epochs)

def _assert_expected_event_calls(self, trainer: Trainer, eval_interval: Time, num_epochs: int):
state = trainer.state
Expand Down
Loading

0 comments on commit d10d442

Please sign in to comment.