diff --git a/examples/eval_chunk_size.py b/examples/eval_chunk_size.py index 40e80038a..9b74da0bb 100644 --- a/examples/eval_chunk_size.py +++ b/examples/eval_chunk_size.py @@ -31,26 +31,21 @@ import logging import torch -# from patrickstar.utils.logging import logger +from patrickstar.utils.logging import logger, log_dist from model_builder import build_transformer_model from ps_config import get_patrickstar_config from parse_args import parse_args from patrickstar.core import PatrickStarClient from patrickstar.core import PSPreProcessCtx - +import time from patrickstar.utils.distributed import get_rank -from rich.logging import RichHandler - -logger = logging.getLogger(__name__) -logger.addHandler(RichHandler()) - MB_NUM = 1024 * 1024 GB_NUM = 1024 * MB_NUM HARDWARE_SETTING_JSON = { - "per_cpu_mem": 16 * GB_NUM, - "per_gpu_mem": 8 * GB_NUM, + "per_cpu_mem": 240 * GB_NUM, + "per_gpu_mem": 32 * GB_NUM, "global_gpu_num": 1, "gloabl_cpu_num": 1, "local_gpu_num": 1, @@ -127,7 +122,7 @@ def get_param_used_chunk_size(args, config, model_func): default_chunk_size=args.default_chunk_size, config=config.get("client", None), ) - + start_time = time.time() try: with PSPreProcessCtx( client=client, @@ -140,7 +135,8 @@ def get_param_used_chunk_size(args, config, model_func): except Exception: logger.error("PSPreProcessCtx failed") return -1, -1 - + end_time = time.time() + log_dist(f"PSPreProcessCtx Model Constructing elapse {end_time - start_time}") del model overall_chunk_size, util = client.get_overall_chunk_size() diff --git a/examples/optimizations/ps_tile_modeling_bert.py b/examples/optimizations/ps_tile_modeling_bert.py index 2cd9fa600..dbf22da99 100644 --- a/examples/optimizations/ps_tile_modeling_bert.py +++ b/examples/optimizations/ps_tile_modeling_bert.py @@ -584,7 +584,7 @@ def __init__(self, config, add_pooling_layer=True): self.pooler = BertPooler(config) if add_pooling_layer else None - self.init_weights() + # self.init_weights() def get_input_embeddings(self): return self.embeddings.word_embeddings @@ -771,7 +771,7 @@ def __init__(self, config): self.dropout = nn.Dropout(classifier_dropout) self.classifier = nn.Linear(config.hidden_size, config.num_labels) - self.init_weights() + # self.init_weights() def forward( self, diff --git a/examples/pretrain_bert_demo.py b/examples/pretrain_bert_demo.py index 35a7f703e..ca18a933c 100644 --- a/examples/pretrain_bert_demo.py +++ b/examples/pretrain_bert_demo.py @@ -39,7 +39,7 @@ from patrickstar.profiler import profiler from patrickstar.runtime import initialize_engine from patrickstar.utils import see_memory_usage -from patrickstar.utils.logging import logger +from patrickstar.utils.logging import log_dist, logger from patrickstar.utils.model_size_calculator import get_ps_model_size from model_builder import build_transformer_model from parse_args import parse_args @@ -53,11 +53,6 @@ def test_transformer_model_helper( dist_plan: str = "torch", num_steps=5, ): - logger.info( - f'test a bert {"fp16" if is_fp16 else "fp32"} model ' - f'{"with checkpoint" if is_ckp else ""}' - ) - # Use single card to simulate multicard. Used when you are poor and # no more GPU avaiable. if args.use_fake_dist: @@ -117,9 +112,9 @@ def test_transformer_model_helper( model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank]) model_numel, model_num_param = get_ps_model_size(model) - logger.info(f"Model size {model_numel / 1e9} B, total params: {model_num_param}") + log_dist(f"Model size {model_numel / 1e9} B, total params: {model_num_param}") total_macs = model_numel * args.batch_size * sequence_length * 2 * 4 - logger.info(f"Total MACs: {total_macs/1024/1024/1024/1024} TFlops") + log_dist(f"Total MACs: {total_macs/1024/1024/1024/1024} TFlops") see_memory_usage( f"After model init. using {dist_plan}, gradient checkpoint: {is_ckp}, fp16 {is_fp16}", @@ -145,7 +140,7 @@ def test_transformer_model_helper( break # You may need to empty_cache for really large models. torch.cuda.empty_cache() - logger.info(f"Start Step {n} with {dist_plan}...") + log_dist(f"Start Step {n} with {dist_plan}...") step_start_time = time.time() # Only collect running time of the last iteration. @@ -201,7 +196,7 @@ def test_transformer_model_helper( f"Step {n} elaspe {step_elapse} s, {total_macs / 1e12 / step_elapse} Tflops" ) - logger.info(f"End Step {n} with {dist_plan}.\n") + log_dist(f"End Step {n} with {dist_plan}.\n") if args.with_mem_profiler: profiler.end() @@ -223,7 +218,7 @@ def test_transformer_model_helper( # You could set the logger level to INFO to view more runtime # information. - logger.setLevel(logging.WARNING) + logger.setLevel(logging.INFO) if not torch.distributed.is_initialized(): torch.distributed.init_process_group( backend="gloo" if args.use_fake_dist else "nccl" diff --git a/examples/run_transformers.sh b/examples/run_transformers.sh index 37660ded4..1c8f15624 100644 --- a/examples/run_transformers.sh +++ b/examples/run_transformers.sh @@ -115,7 +115,7 @@ LOG_DIR="./logs_${MODEL_NAME}" mkdir -p ${LOG_DIR} GIT_VER=`git rev-parse --short=5 HEAD` -LOG_FILE="log.${MODEL_NAME}_gpu_${GPU_NUM}_cs_${CS}_bs_${BS}_cpueb_${CPU_EBD}_lightseq_${LIGHTSEQ}_offload_${ACT_OFFLOAD}_SP_${SP}_AMM_${AMM}_MSC_${MSC}_CACHE_${CACHE}_TILING_${TILING}_${GIT_VER}" +LOG_FILE="log.${MODEL_NAME}_gpu_${GPU_NUM}_cs_${CS}_bs_${BS}_cpueb_${CPU_EBD}_offload_${ACT_OFFLOAD}_SP_${SP}_AMM_${AMM}_MSC_${MSC}_CACHE_${CACHE}_TILING_${TILING}_${GIT_VER}" is_run_flag=`python ./benchmark/is_run_this_file.py --path "${LOG_DIR}" --file "${LOG_FILE}"` echo is_run_flag $is_run_flag @@ -167,7 +167,7 @@ cmd_opts=" if [[ ${CS_SEARCH} == 1 ]]; then mkdir -p ./search_res -SLOG_FILE="./search_res/slog_file.${MODEL_NAME}_bs_${BS}_cpueb_${CPU_EBD}_lightseq_${LIGHTSEQ}_offload_${ACT_OFFLOAD}_SP_${SP}_AMM_${AMM}_MSC_${MSC}_CACHE_${CACHE}_TILING_${TILING}_${GIT_VER}" +SLOG_FILE="./search_res/slog_file.${MODEL_NAME}_bs_${BS}_cpueb_${CPU_EBD}_offload_${ACT_OFFLOAD}_SP_${SP}_AMM_${AMM}_MSC_${MSC}_CACHE_${CACHE}_TILING_${TILING}_${GIT_VER}" rm -rf ${SLOG_FILE} for((i=312;i>=64;i-=32)); @@ -185,6 +185,6 @@ else env OMP_NUM_THREADS=${TNUM} timeout -s SIGKILL 30m python -m torch.distributed.launch --nproc_per_node=${GPU_NUM} \ pretrain_bert_demo.py \ --default_chunk_size=${CHUNK_SIZE} \ - ${cmd_opts} + ${cmd_opts} \ 2>&1 | tee ${LOG_DIR}/${LOG_FILE} fi diff --git a/patrickstar/core/client.py b/patrickstar/core/client.py index 577a2cf09..0121fe269 100644 --- a/patrickstar/core/client.py +++ b/patrickstar/core/client.py @@ -31,7 +31,7 @@ import torch import patrickstar.utils.global_timer as global_timer -from patrickstar.utils import logger, get_world_size, get_rank +from patrickstar.utils import logger, get_world_size, get_rank, log_dist from .chunk_list import ChunkList, ChunkType from .chunk_tensor_index import ChunkTensorIndex from .const import AccessType, ChunkState, TensorState, TrainingStage @@ -91,7 +91,7 @@ def __init__(self, rank: int, default_chunk_size: int, config=None): self.opt_config["with_async_move"], ) if self.opt_config["with_mem_cache"]: - print("[CONFIG] USING MEM CACHE") + logger.debug("[CONFIG] USING MEM CACHE") self._time_profile = True if torch.distributed.is_initialized(): @@ -229,7 +229,7 @@ def append_dummy_chunk(self, data_type: torch.dtype, chunk_type: ChunkType): AccessType.DATA, ) - logger.info( + logger.debug( f"Append a dummy chunk to the Chunk List {chunk_type} " f"comm info {comm_info}" ) @@ -464,7 +464,7 @@ def _fetch_remote_chunks( "CLIENT_fetch_remote_chunks_allgather" ) - logger.info(f"rank {rank} allgather {chunk_id_list}") + logger.debug(f"rank {rank} allgather {chunk_id_list}") torch.distributed.all_gather( allgather_payload_buff, self.chunk_list[local_chunk_id].payload, @@ -924,7 +924,7 @@ def get_overall_chunk_size(self): type_chunk_list, ) in self.chunk_tensor_index.chunk_type_to_chunk_id_list_map.items(): - logger.info(f"Chunk list {type}") + logger.debug(f"Chunk list {type}") for chunk_id in type_chunk_list: chunk = self.chunk_list[chunk_id] if self.opt_config["with_mem_saving_comm"] and chunk.is_dummy(): @@ -943,22 +943,25 @@ def get_overall_chunk_size(self): return overall_size, overall_utilization_ratio def display_chunk_info(self): - logger.info("Print chunk list info.") + logger.debug("Print chunk list info.") overall_size = 0 overall_chunk_num = 0 overall_utilization_ratio = 0.0 + max_utilization_ratio = 0 for ( type, type_chunk_list, ) in self.chunk_tensor_index.chunk_type_to_chunk_id_list_map.items(): - logger.info(f"Chunk list {type}") + logger.debug(f"Chunk list {type}") for chunk_id in type_chunk_list: chunk = self.chunk_list[chunk_id] + if self.opt_config["with_mem_saving_comm"] and chunk.is_dummy(): + continue comm_info = self.chunk_tensor_index.chunk_id_to_comm_info_map[chunk_id] assert comm_info is not None - logger.info( + logger.debug( f"Chunk id {chunk.chunk_id}, state {chunk.get_state()}, " f"comm info {comm_info}, " f"capacity {chunk.capacity / 1024 / 1024} M elems, " @@ -975,15 +978,18 @@ def display_chunk_info(self): f"tensor_id {info.tensor_id}, state {info.state()}, name {info.tensor_name}" ) last_used_pos = max(last_used_pos, info.start_offset + info.numel) - logger.info( + logger.debug( f"chunk used {last_used_pos/1024/1024} M elem, " f"{last_used_pos/chunk.capacity * 100} %" ) - overall_utilization_ratio += last_used_pos / chunk.capacity + cur_util = last_used_pos / chunk.capacity + max_utilization_ratio = max(cur_util, max_utilization_ratio) + overall_utilization_ratio += cur_util overall_size += chunk.get_chunk_space() overall_chunk_num += 1 - logger.info(f"OVERALL CHUNK SIZE {overall_size / 1024 / 1024 / 1024} GB") - logger.info( - f"OVERALL UTILIZATION {overall_utilization_ratio / overall_chunk_num} %" + log_dist(f"OVERALL CHUNK SIZE {overall_size / 1024 / 1024 / 1024} GB") + log_dist( + f"OVERALL UTILIZATION {overall_utilization_ratio / overall_chunk_num * 100} %" ) + log_dist(f"MAX UTILIZATION {max_utilization_ratio * 100} %") diff --git a/patrickstar/core/memtracer/memtracer.py b/patrickstar/core/memtracer/memtracer.py index 2d67bacd5..cee477362 100644 --- a/patrickstar/core/memtracer/memtracer.py +++ b/patrickstar/core/memtracer/memtracer.py @@ -34,6 +34,7 @@ from patrickstar.core.const import TrainingStage from patrickstar.profiler import profiler from patrickstar.utils import ( + log_dist, get_memory_info, get_sys_memory_used, get_world_size, @@ -125,10 +126,6 @@ def __init__(self, local_rank: int = 0, config=None): if self.use_async_mem_monitor: self.async_mem_monitor = AsyncMemoryMonitor() - print( - f"[Mem Tracer] Using Asyn Mem Monitor Flag : {self.use_async_mem_monitor}" - ) - mem_info = get_memory_info() local_world_size = get_local_world_size() if self.use_fake_dist: @@ -150,7 +147,7 @@ def __init__(self, local_rank: int = 0, config=None): mem_info.total * self._overall_cpu_mem_ratio / local_world_size ) - logger.info( + log_dist( f"Init Manager over all gpu mem {self._overall_gpu_mem / 1e6} MB, " f"cpu mem {self._overall_cpu_mem / 1e6} MB" ) @@ -175,14 +172,14 @@ def close_tracer(self): """ if self.use_async_mem_monitor: self.async_mem_monitor.finish() - print("**** Memory Tracer is closed! ****") + log_dist("**** Memory Tracer is closed! ****") def start_train(self, param_fp16_chunk_size, chunk_size): self._param_fp16_chunk_size = param_fp16_chunk_size self._default_chunk_size = chunk_size if self.use_async_mem_monitor: self.async_mem_monitor.start() - print("**** Memory Tracer is stared! ****") + log_dist("**** Memory Tracer is stared! ****") def update_margin_mem(self): r"""Update the number of GPU free chunks for optimizer.""" @@ -193,6 +190,15 @@ def update_margin_mem(self): max_gpu_sys_used = 0 else: max_gpu_sys_used = max(self.gpu_sys_used_list) + + if len(self.cpu_sys_used_list) == 0: + logger.warning( + "No gpu info collected. Maybe there are no chunk based tensors." + ) + max_cpu_sys_used = 0 + else: + max_cpu_sys_used = max(self.cpu_sys_used_list) + margin_mem_size = ( self._overall_gpu_mem - max_gpu_sys_used - self._param_fp16_chunk_size ) @@ -201,14 +207,16 @@ def update_margin_mem(self): (margin_mem_size) / (self._default_chunk_size * 12) * self._margin_use_ratio ) - logger.info("--------------- GPU INFO AFTER BWD ----------------") - logger.info(f"Max GPU System Mem (non-chunk) Used {max_gpu_sys_used / 1e6} MB") - logger.info(f"Param FP16 Chunk Size {self._param_fp16_chunk_size / 1e6} MB") - logger.info( + log_dist("--------------- GPU INFO AFTER BWD ----------------") + log_dist(f"Max GPU System Mem (non-chunk) Used {max_gpu_sys_used / 1e6} MB") + log_dist(f"Max CPU System Mem (non-chunk) Used {max_cpu_sys_used / 1e6} MB") + log_dist(f"Param FP16 Chunk Size {self._param_fp16_chunk_size / 1e6} MB") + log_dist( f"Margin Mem Size {margin_mem_size / 1e6} MB, " f"available chunk num for Optimizer States {self._margin_chunk_num_for_gpu_adam}" ) - logger.info(f"OVERALL GPU MEM {self._overall_gpu_mem}") + log_dist("--------------- GPU INFO AFTER BWD ----------------") + logger.debug(f"OVERALL GPU MEM {self._overall_gpu_mem/1024/1024} MB") def reset_memory_stats(self): """ @@ -228,7 +236,7 @@ def reset_memory_stats(self): self.gpu_used_list = [] self.gpu_chunk_used_list = [] self.gpu_sys_used_list = [] - logger.info("Reset Memory Statistics") + log_dist("Reset Memory Statistics") def get_margin_chunk_num_for_gpu_adam(self): return self._margin_chunk_num_for_gpu_adam diff --git a/patrickstar/core/preprocess.py b/patrickstar/core/preprocess.py index 683e634a3..06885ed18 100644 --- a/patrickstar/core/preprocess.py +++ b/patrickstar/core/preprocess.py @@ -35,7 +35,7 @@ from patrickstar.core import register_param, is_param_registered, ParamType from patrickstar.manager import _runtime_config from patrickstar.ops import Embedding -from patrickstar.utils import logger, print_rank, get_rank, get_world_size +from patrickstar.utils import logger, log_dist, print_rank, get_rank, get_world_size from patrickstar.utils import see_memory_usage _orig_torch_empty = torch.empty @@ -251,7 +251,7 @@ def _post_context_exec(self): number of processes. 3. Add a dummy param at the start of CPU Embedding for huggingface. """ - logger.info("Post Model Init Context") + log_dist("Post Model Init Context") def _origin_new(cls, *arg, **kwargs): return object.__new__(cls) @@ -333,7 +333,7 @@ def _origin_new(cls, *arg, **kwargs): chunk_num += 1 world_size = get_world_size() - logger.info(f"param fp16 chunk num {chunk_num}") + log_dist(f"Param fp16 chunk num {chunk_num}") while chunk_num % world_size != 0: self.client.append_dummy_chunk(torch.half, ChunkType.PARAM_FP16) chunk_num += 1 @@ -363,8 +363,6 @@ def _post_init_method(self, module): self.client.torch_param_allreduce_list.append(param) return - print_rank(f"Converting Params in {module.__class__.__name__}", force=False) - if not _runtime_config.use_chunk: for name, param in module.named_parameters(recurse=False): name = f"{module.__class__.__name__}.{name}_{self.param_idx}" diff --git a/patrickstar/ops/chunk_io_buff.py b/patrickstar/ops/chunk_io_buff.py index de8e37bfb..9218f0333 100644 --- a/patrickstar/ops/chunk_io_buff.py +++ b/patrickstar/ops/chunk_io_buff.py @@ -135,7 +135,7 @@ def reset(self): if self.cached_src_chunk_id is None: return global_rank = get_rank() - logger.info( + logger.debug( f"global_rank {global_rank} finally, write chunk {self.cached_target_chunk_id}" ) # It's possible that the chunk is empty (no payload), e.g. the process only possesses @@ -197,13 +197,13 @@ def __init__( gpu_device, chunk_size, torch.half, False ) else: - logger.info( + logger.debug( f"Allocate fp32 Chunk Buffer of size {chunk_size / 1e6} MB on CPU." ) self.gpu_payload = torch.empty( chunk_size, dtype=torch.half, device=gpu_device ) - logger.info( + logger.debug( f"Allocate fp32 Chunk Buffer of size {chunk_size / 1e6} MB on {gpu_device}." ) self.cached_chunk_id = None diff --git a/patrickstar/ops/fp16_cpu_adam.py b/patrickstar/ops/fp16_cpu_adam.py index 11610b374..111ce44f5 100644 --- a/patrickstar/ops/fp16_cpu_adam.py +++ b/patrickstar/ops/fp16_cpu_adam.py @@ -329,7 +329,7 @@ def fp16_chunk_adam_ops( Copy the chunk into a tmp buffer to speed up the memcpy between devices. """ local_rank = client.local_rank - logger.info( + logger.debug( f"local_rank {local_rank} margin_chunk_num_for_gpu_adam {margin_chunk_num_for_gpu_adam}, " f"param cnt {len(fp32_param_list)}" ) diff --git a/patrickstar/runtime/__init__.py b/patrickstar/runtime/__init__.py index 469fbdb29..030ed31bb 100644 --- a/patrickstar/runtime/__init__.py +++ b/patrickstar/runtime/__init__.py @@ -30,8 +30,9 @@ import torch from patrickstar.core import PSPreProcessCtx, PatrickStarClient from patrickstar.core.memtracer import RuntimeMemTracer -from patrickstar.utils import logger +from patrickstar.utils import logger, log_dist from .engine import PatrickStarEngine +import time DEFAULT_CHUNK_SIZE = 32 * 1024 * 1024 @@ -73,6 +74,8 @@ def initialize_engine(model_func, local_rank, config=None, client=None): config=config.get("client", None), ) + start_time = time.time() + log_dist("begin initialize the model parameters...") with PSPreProcessCtx( client=client, dtype=torch.float, @@ -80,6 +83,10 @@ def initialize_engine(model_func, local_rank, config=None, client=None): use_cpu_embedding=use_cpu_embedding, ): model = model_func() + end_time = time.time() + log_dist( + f"finished initialized the model parameters... {end_time - start_time} s" + ) engine = PatrickStarEngine(model=model, client=client, config=config) client.start_mem_tracer() diff --git a/patrickstar/runtime/engine.py b/patrickstar/runtime/engine.py index 7d97689a9..d235daafc 100644 --- a/patrickstar/runtime/engine.py +++ b/patrickstar/runtime/engine.py @@ -32,7 +32,7 @@ from patrickstar.core import ChunkState, TensorState, TrainingStage, ParamType from patrickstar.fp16 import LossScaler, DynamicLossScaler from patrickstar.ops import FP16Adam -from patrickstar.utils import logger, global_timer +from patrickstar.utils import log_dist, global_timer from .checkpoint import state_dict, load_state_dict from patrickstar.profiler import profiler @@ -86,7 +86,7 @@ def __init__(self, model, client, config): ), "Must have `loss_scale` field set." loss_scale = loss_scale_config["loss_scale"] if loss_scale == 0: - logger.info("Use DynamicLossScaler") + log_dist("Use DynamicLossScaler") self.loss_scaler = DynamicLossScaler( init_scale=( 2 ** loss_scale_config.get("initial_scale_power", 16) @@ -129,7 +129,7 @@ def __init__(self, model, client, config): self.iteration_cnt_ = 0 # TODO(jiaruifang) pass in via config. self.warmup_times = 1 - logger.info("PatrickStarEngine initialized.") + log_dist("PatrickStarEngine initialized.") def _move_torch_parts_to_gpu(self, model): # TODO(zilinzhu) Currently we move all buffers to GPU as the buffer size is diff --git a/patrickstar/utils/global_timer.py b/patrickstar/utils/global_timer.py index 240484840..74fd47ef4 100644 --- a/patrickstar/utils/global_timer.py +++ b/patrickstar/utils/global_timer.py @@ -29,7 +29,7 @@ import time -from .logging import logger +# from .logging import logger from .singleton_meta import SingletonMeta @@ -72,7 +72,7 @@ def reset(self): def print(self): if not self.start_flag: return - logger.info("------------- PROFILE RESULTS ----------------") + print("------------- PROFILE RESULTS ----------------") dot_length = 20 for k in self.elapse_stat: dot_length = max(dot_length, len(k) + 2) diff --git a/patrickstar/utils/logging.py b/patrickstar/utils/logging.py index bdc8a23f2..1dea2e72c 100644 --- a/patrickstar/utils/logging.py +++ b/patrickstar/utils/logging.py @@ -28,6 +28,7 @@ # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import logging import sys +from rich.logging import RichHandler import torch.distributed as dist @@ -57,14 +58,14 @@ def create_logger(name=None, level=logging.WARNING): logger_.propagate = False ch = logging.StreamHandler(stream=sys.stdout) ch.setFormatter(formatter) - logger_.addHandler(ch) + logger_.addHandler(RichHandler()) return logger_ logger = LoggerFactory.create_logger(name="PatrickStar", level=logging.WARNING) -def log_dist(message, ranks=None, level=logging.INFO): +def log_dist(message, ranks=[0], level=logging.INFO): """Log message when one of following condition meets + not dist.is_initialized() + dist.get_rank() in ranks if ranks is not None or ranks = [-1] diff --git a/requirements.txt b/requirements.txt index 377edef87..169c74355 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ torch>=1.5.0 pytest psutil ninja +rich