Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MLPerf logging #831

Merged
merged 63 commits into from
May 3, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
c52f47b
draft mlperf logger
hanlint Mar 18, 2022
1d75c3a
add to callbacks module
hanlint Mar 21, 2022
caab2e9
add mlperf logging callback
hanlint Mar 23, 2022
8f2fee6
add submission directory structure
hanlint Mar 25, 2022
69bb806
add mlperf to setup
hanlint Mar 25, 2022
0813bb6
fix duplicate logging
hanlint Mar 25, 2022
43c74cd
Merge branch 'dev' into hanlin/mlperf
hanlint Mar 25, 2022
d2153d2
Apply suggestions from code review
hanlint Mar 28, 2022
8bbd7cd
Merge branch 'dev' into hanlin/mlperf
hanlint Apr 18, 2022
e010476
update with current_metrics
hanlint Apr 18, 2022
9d588f7
fix setup
hanlint Apr 19, 2022
bee409f
fix docstrings
hanlint Apr 19, 2022
f70406b
add hparams object
hanlint Apr 19, 2022
ba8652f
fix error
hanlint Apr 19, 2022
7ac866b
skip callback in asset test
hanlint Apr 19, 2022
03758b1
Merge branch 'dev' into hanlin/mlperf
hanlint Apr 19, 2022
689d84c
cleanup
hanlint Apr 19, 2022
f02eef3
try removing world_size
hanlint Apr 19, 2022
6491e8c
restore world_size
hanlint Apr 19, 2022
5fe7957
Merge branch 'dev' into hanlin/mlperf
hanlint Apr 19, 2022
b1b6004
Merge branch 'dev' into hanlin/mlperf
hanlint Apr 19, 2022
465f76f
Merge branch 'hanlin/mlperf' of github.com:mosaicml/composer into han…
hanlint Apr 19, 2022
d80e39d
trying removing mlperf tag
hanlint Apr 19, 2022
99bb2ab
cleanup
hanlint Apr 19, 2022
50ae088
Merge branch 'dev' into hanlin/mlperf
ravi-mosaicml Apr 19, 2022
73931a8
please jenkins help
hanlint Apr 19, 2022
2139ae0
one more time
hanlint Apr 19, 2022
6696fdb
never say timeout
hanlint Apr 19, 2022
7ff3392
Merge branch 'hanlin/mlperf' of github.com:mosaicml/composer into han…
hanlint Apr 19, 2022
e03c14e
Merge branch 'dev' into hanlin/mlperf
hanlint Apr 19, 2022
fed0d3f
Merge branch 'dev' into hanlin/mlperf
hanlint Apr 20, 2022
09ed9e5
remove world_size again
hanlint Apr 20, 2022
95c26fc
Merge branch 'hanlin/mlperf' of github.com:mosaicml/composer into han…
hanlint Apr 20, 2022
f2d3c51
Merge branch 'dev' into hanlin/mlperf
ravi-mosaicml Apr 20, 2022
7034f13
remove logging pip
hanlint Apr 21, 2022
67c0640
Merge branch 'hanlin/mlperf' of github.com:mosaicml/composer into han…
hanlint Apr 21, 2022
24e7439
Merge branch 'dev' into hanlin/mlperf
hanlint Apr 21, 2022
e497ce9
Merge branch 'hanlin/mlperf' of github.com:mosaicml/composer into han…
hanlint Apr 21, 2022
2862776
address comments
hanlint Apr 23, 2022
e0a13a4
implement cache clear
hanlint Apr 23, 2022
aae087b
Merge branch 'dev' into hanlin/mlperf
hanlint Apr 25, 2022
5585ce8
fix doctest
hanlint Apr 26, 2022
a524a65
Merge branch 'hanlin/mlperf' of github.com:mosaicml/composer into han…
hanlint Apr 26, 2022
a553110
Merge branch 'dev' into hanlin/mlperf
hanlint Apr 27, 2022
6a2c637
Update composer/callbacks/mlperf.py
hanlint Apr 29, 2022
8f4ea2f
address comments
hanlint Apr 29, 2022
a80a208
Merge branch 'hanlin/mlperf' of github.com:mosaicml/composer into han…
hanlint Apr 29, 2022
cc4d9be
restore dataloaders to state
hanlint May 3, 2022
a431577
cleanup
hanlint May 3, 2022
4cbd163
move items to init
hanlint May 3, 2022
b7fd11e
Merge branch 'dev' into hanlin/mlperf
hanlint May 3, 2022
cdeac03
fix pyright
hanlint May 3, 2022
212b089
clean up tests
hanlint May 3, 2022
13066df
use code block because cannot automate testcode
hanlint May 3, 2022
f8c9732
Apply suggestions from code review
hanlint May 3, 2022
b689f86
address comments
hanlint May 3, 2022
1705a61
Merge branch 'dev' into hanlin/mlperf
hanlint May 3, 2022
051035b
cleanup
hanlint May 3, 2022
afe5313
type ignore until logging pypi is done
hanlint May 3, 2022
ec2a578
Merge branch 'dev' into hanlin/mlperf
hanlint May 3, 2022
2480a2b
cleanup
hanlint May 3, 2022
0136d3d
Merge branch 'hanlin/mlperf' of github.com:mosaicml/composer into han…
hanlint May 3, 2022
621d12e
cleanup
hanlint May 3, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions composer/callbacks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from composer.callbacks.grad_monitor import GradMonitor
from composer.callbacks.lr_monitor import LRMonitor
from composer.callbacks.memory_monitor import MemoryMonitor
from composer.callbacks.mlperf import MLPerfCallback
from composer.callbacks.run_directory_uploader import RunDirectoryUploader
from composer.callbacks.speed_monitor import SpeedMonitor

Expand All @@ -22,6 +23,7 @@
"RunDirectoryUploader",
"SpeedMonitor",
"CheckpointSaver",
"MLPerfCallback",
# hparams objects
"CallbackHparams",
"CheckpointSaverHparams",
Expand Down
285 changes: 285 additions & 0 deletions composer/callbacks/mlperf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
import json
import logging
import os
import platform
import sys
import warnings
from typing import Dict, Optional

import cpuinfo
import psutil
import torch
from mlperf_logging import mllog
hanlint marked this conversation as resolved.
Show resolved Hide resolved

import composer
from composer import Callback, State
from composer.loggers import Logger
from composer.utils import dist

try:
from mlperf_logging import mllog
from mlperf_logging.mllog import constants
mlperf_available = True
except ImportError:
mlperf_available = False

BENCHMARKS = ("resnet")
hanlint marked this conversation as resolved.
Show resolved Hide resolved
hanlint marked this conversation as resolved.
Show resolved Hide resolved
DIVISIONS = ("open")
hanlint marked this conversation as resolved.
Show resolved Hide resolved
STATUS = ("onprem", "cloud", "preview")


def rank_zero() -> bool:
hanlint marked this conversation as resolved.
Show resolved Hide resolved
return dist.get_global_rank() == 0


class MLPerfCallback(Callback):
"""Creates a compliant results file for MLPerf Training benchmark.

A submission folder structure will be created with the ``root_folder``
as the base and the following directories:
hanlint marked this conversation as resolved.
Show resolved Hide resolved

root_folder/
results/
[system_name]/
[benchmark]/
results_0.txt
results_1.txt
...
systems/
[system_name].json

A required systems description will be automatically generated,
and best effort made to populate the fields, but should be manually
checked prior to submission.

Currently, only OPEN division submissions are supported with this Callback.

Args:
root_folder (str): The root submission folder
hanlint marked this conversation as resolved.
Show resolved Hide resolved
hanlint marked this conversation as resolved.
Show resolved Hide resolved
index (int): The repetition index of this run. The filename created will be
``result_[index].txt``.
submitter (str, optional): Submitting organization. Default: MosaicML.
system_name (str, optional): Name of the system (e.g. 8xA100_composer). If
not provided, system name will default to ``[world_size]x[device_name]_composer``,
e.g. ``8xNVIDIA_A100_80GB_composer.
benchmark (str, optional): Benchmark name. Default: ``"resnet"``.
division (str, optional): Division of submission. Currently only open division is
supported. Default: ``"open"``.
status (str, optional): Submission status. One of (onprem, cloud, or preview).
Default: ``"onprem"``.
target (float, optional): The target metric before the mllogger marks the stop
of the timing run. Default: ``0.759`` (resnet benchmark).
"""

def __init__(
self,
root_folder: str,
index: int,
submitter: str = "MosaicML",
hanlint marked this conversation as resolved.
Show resolved Hide resolved
system_name: Optional[str] = None,
benchmark: str = "resnet",
division: str = "open",
status: str = "onprem",
target: float = 0.759,
) -> None:

if benchmark not in BENCHMARKS:
raise ValueError(f"benchmark: {benchmark} must be one of {BENCHMARKS}")
if division not in DIVISIONS:
raise ValueError(f"division: {division} must be one of {DIVISIONS}")
if status not in STATUS:
raise ValueError(f"status: {status} must be one of {STATUS}")
if not mlperf_available:
raise ValueError("MLperf logger is required")
hanlint marked this conversation as resolved.
Show resolved Hide resolved
self.mllogger = mllog.get_mllogger()
self.target = target
self.system_name = system_name
self.benchmark = benchmark
self.root_folder = root_folder

system_desc = get_system_description(submitter, division, status, system_name)
hanlint marked this conversation as resolved.
Show resolved Hide resolved
system_name = system_desc['system_name']

self._create_submission_folders(root_folder, system_name, benchmark)
hanlint marked this conversation as resolved.
Show resolved Hide resolved

# save system description file
systems_path = os.path.join(root_folder, 'systems', f'{system_name}.json')
if os.path.exists(systems_path):
hanlint marked this conversation as resolved.
Show resolved Hide resolved
with open(systems_path, 'r') as f:
existing_systems_desc = json.load(f)
if sorted(existing_systems_desc.items()) != sorted(system_desc.items()):
raise ValueError(f'Existing system description in {systems_path} does not match this machine.')
else:
with open(systems_path, 'w') as f:
hanlint marked this conversation as resolved.
Show resolved Hide resolved
hanlint marked this conversation as resolved.
Show resolved Hide resolved
json.dump(system_desc, f, indent=4)

filename = os.path.join(root_folder, 'results', system_name, benchmark, f'result_{index}.txt')
if os.path.exists(filename):
hanlint marked this conversation as resolved.
Show resolved Hide resolved
raise FileExistsError(f'{filename} already exists.')

self._file_handler = logging.FileHandler(filename)
self._file_handler.setLevel(logging.INFO)
self.mllogger.logger.addHandler(self._file_handler)

# TODO: implement cache clearing
hanlint marked this conversation as resolved.
Show resolved Hide resolved
self.mllogger.start(key=mllog.constants.CACHE_CLEAR)
self.mllogger.start(key=mllog.constants.INIT_START)

if rank_zero():
self._log_dict({
constants.SUBMISSION_BENCHMARK: benchmark,
constants.SUBMISSION_DIVISION: division,
constants.SUBMISSION_ORG: submitter,
constants.SUBMISSION_PLATFORM: system_name,
constants.SUBMISSION_STATUS: status,
})

def _create_submission_folders(self, root_folder: str, system_name: str, benchmark: str):
if not os.path.isdir(root_folder):
raise FileNotFoundError(f"{root_folder} not found.")
hanlint marked this conversation as resolved.
Show resolved Hide resolved

results_folder = os.path.join(root_folder, 'results')
log_folder = os.path.join(root_folder, 'results', system_name)
benchmark_folder = os.path.join(log_folder, benchmark)
systems_folder = os.path.join(root_folder, 'systems')

os.makedirs(results_folder, exist_ok=True)
os.makedirs(log_folder, exist_ok=True)
os.makedirs(benchmark_folder, exist_ok=True)
os.makedirs(systems_folder, exist_ok=True)

def _log_dict(self, data: Dict):
hanlint marked this conversation as resolved.
Show resolved Hide resolved
for key, value in data.items():
self.mllogger.event(key=key, value=value)

def fit_start(self, state: State, logger: Logger) -> None:
if rank_zero():
if state.train_dataloader.batch_size is None:
raise ValueError("Batch size is required to be set for dataloader.")

self._log_dict({
constants.SEED: state.seed,
constants.GLOBAL_BATCH_SIZE: state.train_dataloader.batch_size * dist.get_world_size(),
constants.GRADIENT_ACCUMULATION_STEPS: state.grad_accum,
constants.TRAIN_SAMPLES: len(state.train_dataloader.dataset),
constants.EVAL_SAMPLES: len(state.evaluators[0].dataloader.dataloader.dataset)
hanlint marked this conversation as resolved.
Show resolved Hide resolved
})

self.mllogger.event(key=constants.INIT_STOP)

dist.barrier()
if rank_zero():
self.mllogger.event(key=constants.RUN_START)

def epoch_start(self, state: State, logger: Logger) -> None:
if rank_zero():
self.mllogger.event(key=constants.EPOCH_START, metadata={'epoch_num': state.timer.epoch.value})
self.mllogger.event(key=constants.BLOCK_START,
metadata={
'first_epoch_num': state.timer.epoch.value,
'epoch_count': 1
})

def epoch_end(self, state: State, logger: Logger) -> None:
if rank_zero():
hanlint marked this conversation as resolved.
Show resolved Hide resolved
self.mllogger.event(key=constants.EPOCH_STOP, metadata={'epoch_num': state.timer.epoch.value})

def eval_start(self, state: State, logger: Logger) -> None:
if rank_zero():
self.mllogger.event(key=constants.EVAL_START, metadata={'epoch_num': state.timer.epoch.value})

def eval_end(self, state: State, logger: Logger) -> None:
if rank_zero():
accuracy = 0.99 # TODO: retrieve accuracy from metrics

self.mllogger.event(key=constants.EVAL_STOP, metadata={'epoch_num': state.timer.epoch.value})
self.mllogger.event(key=constants.EVAL_ACCURACY,
value=accuracy,
metadata={'epoch_num': state.timer.epoch.value})
self.mllogger.event(key=constants.BLOCK_STOP, metadata={'first_epoch_num': state.timer.epoch.value})

if accuracy > self.target:
self.mllogger.event(key=constants.RUN_STOP, metadata={"status": "success"})
self.mllogger.logger.removeHandler(self._file_handler)

hanlint marked this conversation as resolved.
Show resolved Hide resolved

def get_system_description(
submitter: str,
division: str,
status: str,
system_name: Optional[str] = None,
) -> Dict[str, str]:
"""Generates a valid system description.

Make a best effort to auto-populate some of the fields, but should
be manually checked prior to submission. The system name is
auto-generated as "[world_size]x[device_name]_composer", e.g.
"8xNVIDIA_A100_80GB_composer".

Args:
submitter (str): Name of the submitter organization
division (str): Submission division (open, closed)
status (str): system status (cloud, onprem, preview)

Returns:
system description as a dictionary
"""
is_cuda = torch.cuda.is_available()
cpu_info = cpuinfo.get_cpu_info()
hanlint marked this conversation as resolved.
Show resolved Hide resolved

system_desc = {
"submitter": submitter,
"division": division,
"status": status,
"number_of_nodes": dist.get_world_size() / dist.get_local_world_size(),
"host_processors_per_node": "",
"host_processor_model_name": str(cpu_info.get('brand_raw', "CPU")),
"host_processor_core_count": str(psutil.cpu_count(logical=False)),
hanlint marked this conversation as resolved.
Show resolved Hide resolved
"host_processor_vcpu_count": "",
"host_processor_frequency": cpu_info.get('hz_advertised_friendly', ""),
"host_processor_caches": "",
"host_processor_interconnect": "",
"host_memory_capacity": "",
"host_storage_type": "",
"host_storage_capacity": "",
"host_networking": "",
"host_networking_topology": "",
"host_memory_configuration": "",
"accelerators_per_node": str(dist.get_local_world_size()) if is_cuda else "0",
"accelerator_model_name": str(torch.cuda.get_device_name(None)) if is_cuda else "",
"accelerator_host_interconnect": "",
"accelerator_frequency": "",
"accelerator_on-chip_memories": "",
"accelerator_memory_configuration": "",
"accelerator_memory_capacity": "",
"accelerator_interconnect": "",
"accelerator_interconnect_topology": "",
"cooling": "",
"hw_notes": "",
"framework": f"PyTorch v{torch.__version__} and MosaicML composer v{composer.__version__}",
"other_software_stack": {
"cuda_version": torch.version.cuda if is_cuda else "",
"composer_version": composer.__version__,
"python_version": sys.version,
},
"operating_system": f"{platform.system()} {platform.release()}",
"sw_notes": "",
}

if system_desc['number_of_nodes'] != 1:
warnings.warn("Number of nodes > 1 not tested, proceed with caution.")

if system_name is None:
world_size = dist.get_world_size()
if is_cuda:
device_name = system_desc['accelerator_model_name']
else:
device_name = system_desc['host_processor_model_name']

device_name = device_name.replace(' ', '_')
system_name = f"{world_size}x{device_name}_composer"

# default to system name as "[world_size]x[device_name]"
# e.g. 8xNVIDIA_A100_80GB
system_desc['system_name'] = system_name
return system_desc
hanlint marked this conversation as resolved.
Show resolved Hide resolved
9 changes: 8 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def run(self):
# From https://github.com/pypa/pip/issues/7953#issuecomment-645133255
site.ENABLE_USER_SITE = _IS_USER


def package_files(prefix: str, directory: str, extension: str):
# from https://stackoverflow.com/a/36693250
paths = []
Expand All @@ -40,6 +41,7 @@ def package_files(prefix: str, directory: str, extension: str):
paths.append(os.path.relpath(os.path.join(path, filename), prefix))
return paths


with open("README.md", "r", encoding="utf-8") as fh:
long_description = fh.read()

Expand Down Expand Up @@ -151,7 +153,12 @@ def package_files(prefix: str, directory: str, extension: str):
"wurlitzer>=3.0.2,<4",
]

extra_deps["all"] = set(dep for deps in extra_deps.values() for dep in deps)
extra_deps['mlperf'] = [
# TODO: switch to pip package when available: https://github.com/mlcommons/logging/issues/218
'git+https://github.com/mlperf/logging.git',
hanlint marked this conversation as resolved.
Show resolved Hide resolved
'py-cpuinfo>=8.0.0,<9',
]
extra_deps['all'] = set(dep for deps in extra_deps.values() for dep in deps)

composer_data_files = ["py.typed"]
composer_data_files += package_files("composer", "yamls", ".yaml")
Expand Down
65 changes: 65 additions & 0 deletions tests/callbacks/test_mlperf_callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import logging

import numpy as np
import pytest
from torch.utils.data import DataLoader

from composer import Trainer
from composer.callbacks import MLPerfCallback
from tests.common import RandomClassificationDataset, SimpleModel

logging.basicConfig(filename="/Users/hanlintang/composer/package_checker.log", level=logging.INFO)
hanlint marked this conversation as resolved.
Show resolved Hide resolved
logging.getLogger().addHandler(logging.StreamHandler())
formatter = logging.Formatter("%(levelname)s - %(message)s")
logging.getLogger().handlers[0].setFormatter(formatter)
logging.getLogger().handlers[1].setFormatter(formatter)

hanlint marked this conversation as resolved.
Show resolved Hide resolved

@pytest.fixture
def config():
hanlint marked this conversation as resolved.
Show resolved Hide resolved
"""Returns the reference config."""

return {
'model': SimpleModel(),
'train_dataloader': DataLoader(
dataset=RandomClassificationDataset(),
batch_size=4,
shuffle=False,
),
'eval_dataloader': DataLoader(
dataset=RandomClassificationDataset(),
shuffle=False,
),
'max_duration': '2ep',
'deterministic_mode': True, # testing equivalence
'loggers': [], # no progress bar
'callbacks': []
}


@pytest.mark.filterwarnings(
"ignore::DeprecationWarning",)
def test_mlperf_callback(config, tmpdir):
tmpdir = 'mlperf_results'
hanlint marked this conversation as resolved.
Show resolved Hide resolved
pytest.importorskip("mlperf_logging")

for run in range(5):
hanlint marked this conversation as resolved.
Show resolved Hide resolved
mlperf_callback = MLPerfCallback(root_folder=tmpdir, num_result=run)
config['callbacks'] = [mlperf_callback]
config['seed'] = np.random.randint(2e5) # mlperf seeds are released near submission deadline
trainer = Trainer(**config)
trainer.fit()

# run result checker
from mlperf_logging.package_checker.package_checker import check_training_package

check_training_package(
folder=tmpdir,
usage="training",
ruleset="1.1.0",
werror=True,
quiet=False,
rcp_bypass=False,
rcp_bert_train_samples=False,
log_output="package_checker.log",
)
hanlint marked this conversation as resolved.
Show resolved Hide resolved