Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: tests on cpu #38

Merged
merged 3 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions src/modalities/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ def evaluate(
) -> Dict[str, EvaluationResultBatch]:
result_dict: Dict[str, EvaluationResultBatch] = {}
model.eval()

device = torch.device(self.local_rank) if torch.cuda.is_available() else "cpu"
lllAlexanderlll marked this conversation as resolved.
Show resolved Hide resolved

for data_loader in data_loaders:
if torch.cuda.is_available():
cummulated_loss = torch.zeros(3).to(torch.device(self.local_rank))
else:
cummulated_loss = torch.zeros(3).to("cpu")
cummulated_loss = torch.zeros(3).to(device)

Evaluator._publish_progress(
batch_progress_publisher=self.batch_progress_publisher,
Expand All @@ -68,7 +68,7 @@ def evaluate(

cummulated_loss[0] += batch_loss.item() # sum up batch loss
cummulated_loss[1] += len(batch)
batch_length_tensor = torch.tensor(len(batch)).to(torch.device(self.local_rank))
batch_length_tensor = torch.tensor(len(batch)).to(device)
thoughput_aggregator.add_value(key=ThroughputAggregationKeys.NUM_SAMPLES, value=batch_length_tensor)

local_dataset_sample_id = Evaluator._get_local_sample_id(
Expand All @@ -90,9 +90,7 @@ def evaluate(
post_processing_fun=lambda t: t[0] / t[1],
)

foward_backward_time = torch.tensor(forward_backward_timer_recorder.delta_t).to(
torch.device(self.local_rank)
)
foward_backward_time = torch.tensor(forward_backward_timer_recorder.delta_t).to(device)
thoughput_aggregator.add_value(
key=ThroughputAggregationKeys.FORWARD_BACKWARD_TIME, value=foward_backward_time
)
Expand Down
8 changes: 4 additions & 4 deletions src/modalities/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ def train(
cummulated_loss = self._reset_loss()
thoughput_aggregator = Aggregator[ThroughputAggregationKeys]()

device = torch.device(self.local_rank) if torch.cuda.is_available() else "cpu"
lllAlexanderlll marked this conversation as resolved.
Show resolved Hide resolved

# batch loop
batch: DatasetBatch
# TODO: why do we need a barrier here?
Expand All @@ -88,7 +90,7 @@ def train(
# Save the batch loss
cummulated_loss[0] += batch_loss.item()
cummulated_loss[1] += len(batch)
batch_length_tensor = torch.tensor(len(batch)).to(torch.device(self.local_rank))
batch_length_tensor = torch.tensor(len(batch)).to(device)
thoughput_aggregator.add_value(key=ThroughputAggregationKeys.NUM_SAMPLES, value=batch_length_tensor)
self._publish_progress(
batch_progress_publisher=self.batch_progress_publisher,
Expand All @@ -101,9 +103,7 @@ def train(
# Check, if model should be evaluated
if (local_train_batch_id + 1) % callback_interval_in_batches == 0:
if local_train_batch_id > 0:
foward_backward_time = torch.tensor(forward_backward_time_recorder.delta_t).to(
torch.device(self.local_rank)
)
foward_backward_time = torch.tensor(forward_backward_time_recorder.delta_t).to(device)
forward_backward_time_recorder.reset()

thoughput_aggregator.add_value(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previously used mocking of the "reduce" operation led to problems below. More concretely, line 112 and line 115 both returned None (the return_value), which caused an error in line 118.

Expand Down
25 changes: 11 additions & 14 deletions tests/test_gym.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from unittest.mock import call, patch
from unittest.mock import call

import torch

from modalities.batch import DatasetBatch
from modalities.gym import Gym
from modalities.running_env.fsdp.reducer import Reducer


def test_run_cpu_only(
Expand Down Expand Up @@ -37,15 +36,13 @@ def test_run_cpu_only(
llm_data_loader_mock.__len__ = lambda _: num_batches

gym = Gym(trainer=trainer, evaluator=evaluator_mock, loss_fun=loss_mock, num_ranks=num_ranks)
with patch.object(Reducer, "reduce", return_value=None) as reduce_mock:
gym.run(
model=nn_model_mock,
optimizer=optimizer_mock,
callback_interval_in_batches=int(num_batches),
train_data_loader=llm_data_loader_mock,
evaluation_data_loaders=[],
checkpointing=checkpointing_mock,
)
nn_model_mock.forward.assert_has_calls([call(b.samples) for b in batches])
optimizer_mock.step.assert_called()
reduce_mock.assert_called()
gym.run(
model=nn_model_mock,
optimizer=optimizer_mock,
callback_interval_in_batches=int(num_batches),
train_data_loader=llm_data_loader_mock,
evaluation_data_loaders=[],
checkpointing=checkpointing_mock,
)
nn_model_mock.forward.assert_has_calls([call(b.samples) for b in batches])
optimizer_mock.step.assert_called()
Loading