diff --git a/3rdparty/Automodel-workspace/Automodel b/3rdparty/Automodel-workspace/Automodel index 277a8a8d95..c84b9c401b 160000 --- a/3rdparty/Automodel-workspace/Automodel +++ b/3rdparty/Automodel-workspace/Automodel @@ -1 +1 @@ -Subproject commit 277a8a8d951f6d8bf030d34915cfa61b88eebffd +Subproject commit c84b9c401b7633340846c77773b48a7b9be90d1f diff --git a/examples/configs/grpo_math_moonlight_te_deepep.yaml b/examples/configs/grpo_math_moonlight_te_deepep.yaml new file mode 100644 index 0000000000..ce38638f7c --- /dev/null +++ b/examples/configs/grpo_math_moonlight_te_deepep.yaml @@ -0,0 +1,248 @@ +# GRPO Algorithm Configuration +grpo: + num_prompts_per_step: 32 + num_generations_per_prompt: 16 + max_rollout_turns: 1 # for multi-turn rollouts. Math Environments just have 1 turn (answering the question) + max_num_epochs: 1 + max_num_steps: 1000000 + normalize_rewards: true + use_leave_one_out_baseline: true + val_period: 10 + val_at_start: false + overlong_filtering: false + max_val_samples: 256 + val_batch_size: 256 + seed: 42 + +loss_fn: + reference_policy_kl_penalty: 0.01 + ratio_clip_min: 0.2 + ratio_clip_max: 0.2 + ratio_clip_c: null + # (default off) loss formulation improvements (docs/guides/grpo.md#loss) + use_on_policy_kl_approximation: false + use_importance_sampling_correction: false + sequence_level_importance_ratios: false + token_level_loss: true + +checkpointing: + enabled: true + checkpoint_dir: "results/grpo" + metric_name: "val_reward" + higher_is_better: true + keep_top_k: 3 + save_period: 10 + checkpoint_must_save_by: null + +policy: + model_name: "moonshotai/Moonlight-16B-A3B" + tokenizer: + name: ${policy.model_name} ## specify if you'd like to use a tokenizer different from the model's default + train_global_batch_size: 256 + train_micro_batch_size: 4 + generation_batch_size: 32 # Only used when generating using HF backend + logprob_batch_size: 4 + max_total_sequence_length: 2048 + precision: "bfloat16" + logprob_chunk_size: null + + dtensor_cfg: + _v2: true + enabled: true + cpu_offload: False + sequence_parallel: false + activation_checkpointing: false + tensor_parallel_size: 1 + context_parallel_size: 1 + pipeline_parallel_size: 1 + expert_parallel_size: 8 + custom_parallel_plan: null + + # v2 only attributes + parallelize_fn: nemo_automodel.components.models.deepseek_v3.parallelizer.parallelize_model + model: + _target_: nemo_automodel.components.models.deepseek_v3.model.DeepseekV3ForCausalLM.from_config + num_layers: null # Optionally override the number of hidden layers (null = use model default) + backend: + _target_: nemo_automodel.components.moe.utils.BackendConfig + attn: te + linear: te + rms_norm: te + enable_deepep: true + fake_balanced_gate: true + enable_hf_state_dict_adapter: false + + megatron_cfg: + enabled: false + empty_unused_memory_level: 0 + activation_checkpointing: false + converter_type: "Qwen2ForCausalLM" + tensor_model_parallel_size: 1 + expert_tensor_parallel_size: 1 + expert_model_parallel_size: 1 + pipeline_model_parallel_size: 1 + num_layers_in_first_pipeline_stage: null + num_layers_in_last_pipeline_stage: null + context_parallel_size: 1 + pipeline_dtype: ${policy.precision} + sequence_parallel: false + freeze_moe_router: true + moe_router_dtype: "fp64" + moe_router_load_balancing_type: "none" # "seq_aux_loss" causes logprob error divergence for grpo + moe_router_bias_update_rate: 0.0 # by default, disable bias updates for grpo + moe_permute_fusion: false + #gives ~20% training perf speedup with sequence packing + apply_rope_fusion: True + defer_fp32_logits: null + + optimizer: + optimizer: "adam" + lr: 5.0e-6 + min_lr: 5.0e-7 + weight_decay: 0.01 + bf16: true + fp16: false + params_dtype: "float32" + + #adam + adam_beta1: 0.9 + adam_beta2: 0.999 + adam_eps: 1e-8 + + #sgd + sgd_momentum: 0.9 + + #distributed optimizer + use_distributed_optimizer: true + use_precision_aware_optimizer: true + + clip_grad: ${policy.max_grad_norm} + + scheduler: + start_weight_decay: ${policy.megatron_cfg.optimizer.weight_decay} + end_weight_decay: ${policy.megatron_cfg.optimizer.weight_decay} + weight_decay_incr_style: "constant" + lr_decay_style: "constant" + lr_decay_iters: 1000 + lr_warmup_iters: 13 + lr_warmup_init: 5.0e-7 + + distributed_data_parallel_config: + grad_reduce_in_fp32: false + overlap_grad_reduce: true + overlap_param_gather: true + average_in_collective: true + use_custom_fsdp: false + data_parallel_sharding_strategy: "optim_grads_params" + + env_vars: null + + # See docs/design-docs/sequence-packing-and-dynamic-batching.md + # for more details on dynamic batching and sequence packing. + dynamic_batching: + enabled: False + train_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.train_micro_batch_size}} + logprob_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.logprob_batch_size}} + sequence_length_round: 64 + + sequence_packing: + enabled: True + train_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.train_micro_batch_size}} + logprob_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.logprob_batch_size}} + algorithm: "modified_first_fit_decreasing" + sequence_length_round: 64 + + # makes the training sequence length divisible by the tensor parallel size + # this is useful for sequence parallel training + make_sequence_length_divisible_by: ${policy.dtensor_cfg.tensor_parallel_size} + max_grad_norm: 1.0 + + optimizer: + name: "torch.optim.AdamW" + kwargs: + lr: 5.0e-6 + weight_decay: 0.01 + betas: [0.9, 0.999] + eps: 1e-8 + # when using Dtensor, we need to set foreach + # and fused to False + foreach: False + fused: False + + scheduler: + - name: "torch.optim.lr_scheduler.LinearLR" + kwargs: + start_factor: 0.1 + end_factor: 1.0 + total_iters: 50 + - name: "torch.optim.lr_scheduler.ConstantLR" + kwargs: + factor: 1.0 + total_iters: 10000000000 + - milestones: [50] + + generation: + backend: "vllm" + max_new_tokens: ${policy.max_total_sequence_length} + temperature: 1.0 + top_p: 1.0 + top_k: null + stop_token_ids: null + stop_strings: null + vllm_cfg: + async_engine: false + precision: ${policy.precision} + tensor_parallel_size: 1 + pipeline_parallel_size: 1 + enable_expert_parallel: false + gpu_memory_utilization: 0.6 + max_model_len: ${policy.max_total_sequence_length} + # when enforce_eager is False, it is optional to set ++policy.generation.vllm_kwargs.compilation_config.use_inductor=False for better accuracy, + # with the flag, vllm will use the custom CUDA kernels instead of the Triton kernels generated by torch.compile + # for more details, see convergence issue https://github.com/NVIDIA-NeMo/RL/issues/998 + enforce_eager: False + use_deep_gemm: False + num_last_layers_in_bf16: 0 + num_first_layers_in_bf16: 0 + vllm_kwargs: {} + colocated: + # true: generation shares training GPUs + # false: uses dedicated generation resources + enabled: true + # only relevant when enabled is false + resources: + gpus_per_node: null # Decides num gpus to be dedicated to generation when there is one node in the cluster i.e cluster.num_nodes == 1 + num_nodes: null # Decides number of nodes to be dedicated to generation + +data: + max_input_seq_length: ${policy.max_total_sequence_length} # upper bound, real truncation occurs at vllm.max_model_len + prompt_file: "examples/prompts/cot.txt" + system_prompt_file: null + dataset_name: "OpenMathInstruct-2" + shuffle: true + +env: + math: + num_workers: 8 + +logger: + log_dir: "logs" # Base directory for all logs + num_val_samples_to_print: 0 # Number of validation samples to pretty print on terminal + wandb_enabled: false + tensorboard_enabled: false + mlflow_enabled: false # Disable MLflow logging + monitor_gpus: true # If true, will monitor GPU usage and log to wandb and/or tensorboard + wandb: + project: "grpo-dev" + name: "grpo-dev-logger" + tensorboard: {} + mlflow: + experiment_name: "grpo-dev" + run_name: "grpo-dev-logger" + gpu_monitoring: + collection_interval: 10 # How often to collect GPU usage metrics (in seconds) + flush_interval: 10 # How often to flush GPU usage metrics to the loggers (in seconds) + +cluster: + gpus_per_node: 1 + num_nodes: 1 diff --git a/examples/configs/sft_moonlight_16b_te_deepep.yaml b/examples/configs/sft_moonlight_16b_te_deepep.yaml new file mode 100644 index 0000000000..b456ae82b8 --- /dev/null +++ b/examples/configs/sft_moonlight_16b_te_deepep.yaml @@ -0,0 +1,189 @@ +# SFT Algorithm Configuration +sft: + ## total number of steps to train will equal + ## min((max_num_epochs * len(train_dataloader)), max_num_steps) + max_num_epochs: 1 + max_num_steps: 60 + + val_period: 10 + val_batches: 8 + val_global_batch_size: 32 + val_micro_batch_size: 1 + val_at_start: true + seed: 42 + +checkpointing: + enabled: true + checkpoint_dir: "results/sft" + metric_name: "val_loss" ## set to null to save most recent k checkpoints + higher_is_better: false + keep_top_k: 3 + save_period: 10 + checkpoint_must_save_by: null + +policy: + model_name: "moonshotai/Moonlight-16B-A3B" + tokenizer: + name: ${policy.model_name} ## specify if you'd like to use a tokenizer different from the model's default + train_global_batch_size: 256 + train_micro_batch_size: 4 + generation_batch_size: 32 # Only used when generating using HF backend + logprob_batch_size: 4 + max_total_sequence_length: 2048 + precision: "bfloat16" + logprob_chunk_size: null + + dtensor_cfg: + _v2: true + enabled: true + cpu_offload: False + sequence_parallel: false + activation_checkpointing: false + tensor_parallel_size: 1 + context_parallel_size: 1 + pipeline_parallel_size: 1 + expert_parallel_size: 2 + custom_parallel_plan: null + + # v2 only attributes + parallelize_fn: nemo_automodel.components.models.deepseek_v3.parallelizer.parallelize_model + model: + _target_: nemo_automodel.components.models.deepseek_v3.model.DeepseekV3ForCausalLM.from_config + num_layers: null # Optionally override the number of hidden layers (null = use model default) + backend: + _target_: nemo_automodel.components.moe.utils.BackendConfig + attn: te + linear: te + rms_norm: te + enable_deepep: true + fake_balanced_gate: true + enable_hf_state_dict_adapter: true + + dynamic_batching: + enabled: false + train_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.train_micro_batch_size}} + sequence_length_round: 64 + + sequence_packing: + enabled: False + train_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.train_micro_batch_size}} + algorithm: "modified_first_fit_decreasing" + sequence_length_round: 64 + + # makes the training sequence length divisible by the tensor parallel size + # this is useful for sequence parallel training + make_sequence_length_divisible_by: ${policy.dtensor_cfg.tensor_parallel_size} + max_grad_norm: 1.0 + + optimizer: + name: "torch.optim.AdamW" + kwargs: + lr: 5.0e-6 + weight_decay: 0.1 + betas: [0.9, 0.98] + eps: 1e-5 + # when using Dtensor, we need to set foreach + # and fused to False + foreach: False + fused: False + + ## ignored since enabled=false, but needed for testing purposes + megatron_cfg: + enabled: false + empty_unused_memory_level: 1 + activation_checkpointing: false + tensor_model_parallel_size: 1 + expert_tensor_parallel_size: 1 + expert_model_parallel_size: 1 + pipeline_model_parallel_size: 1 + context_parallel_size: 1 + pipeline_dtype: ${policy.precision} + num_layers_in_first_pipeline_stage: null + num_layers_in_last_pipeline_stage: null + sequence_parallel: false + freeze_moe_router: false + moe_router_dtype: null + moe_router_load_balancing_type: "aux_loss" + moe_router_bias_update_rate: 1e-3 + moe_permute_fusion: false + #gives ~20% training perf speedup with sequence packing + apply_rope_fusion: True + + optimizer: + optimizer: "adam" + lr: 5.0e-6 + min_lr: 4.9999e-6 + weight_decay: 0.1 + bf16: false + fp16: false + params_dtype: "float32" + + #adam + adam_beta1: 0.9 + adam_beta2: 0.98 + adam_eps: 1e-5 + + #sgd + sgd_momentum: 0.9 + + #distributed optimizer + use_distributed_optimizer: true + use_precision_aware_optimizer: true + + clip_grad: ${policy.max_grad_norm} + + scheduler: + start_weight_decay: ${policy.megatron_cfg.optimizer.weight_decay} + end_weight_decay: ${policy.megatron_cfg.optimizer.weight_decay} + weight_decay_incr_style: "constant" + lr_decay_style: "constant" + lr_decay_iters: 1000 + lr_warmup_iters: 50 + lr_warmup_init: 4.9999e-6 + + distributed_data_parallel_config: + grad_reduce_in_fp32: false + overlap_grad_reduce: true + overlap_param_gather: true + average_in_collective: true + data_parallel_sharding_strategy: "optim_grads_params" + use_custom_fsdp: false + +data: + max_input_seq_length: ${policy.max_total_sequence_length} + dataset_name: "squad" + add_bos: true + add_eos: true + add_generation_prompt: false + shuffle: true + num_workers: 20 + + ## unused with squad dataset + prompt_file: null + split: null + output_key: null + seed: null + +logger: + log_dir: "logs" # Base directory for all logs + wandb_enabled: true # Make sure you do a ``wandb login [Your API key]'' before running + tensorboard_enabled: false + mlflow_enabled: false + swanlab_enabled: false # Disable SwanLab logging + monitor_gpus: true # If true, will monitor GPU usage and log to wandb and/or tensorboard + wandb: + project: "sft-moonlight-joyang" + name: "sft-dev-${data.dataset_name}-ep${policy.dtensor_cfg.expert_parallel_size}" + tensorboard: + log_dir: "tb_logs-sft-dev-${data.dataset_name}" + mlflow: + experiment_name: "sft-dev" + run_name: "sft-dev-${data.dataset_name}" + + gpu_monitoring: + collection_interval: 10 # How often to collect GPU usage metrics (in seconds) + flush_interval: 10 # How often to flush GPU usage metrics to the loggers (in seconds) + +cluster: + gpus_per_node: 8 + num_nodes: 1 diff --git a/nemo_rl/models/policy/dtensor_policy_worker_v2.py b/nemo_rl/models/policy/dtensor_policy_worker_v2.py index ee5cea0b5d..c8a2f1261e 100644 --- a/nemo_rl/models/policy/dtensor_policy_worker_v2.py +++ b/nemo_rl/models/policy/dtensor_policy_worker_v2.py @@ -29,10 +29,16 @@ from nemo_automodel.components._transformers.utils import ( sliding_window_overwrite, ) +from nemo_automodel.components.checkpoint.checkpointing import ( + load_model_from_base_checkpoint, + to_empty_parameters_only, +) +from nemo_automodel.components.config.loader import _resolve_target from nemo_automodel.components.distributed.cp_utils import ( create_context_parallel_ctx, get_train_context, ) +from nemo_automodel.components.distributed.fsdp2 import FSDP2Manager from nemo_automodel.components.distributed.grad_utils import ( clip_grad_by_total_norm_, get_grad_norm, @@ -61,6 +67,7 @@ AutoTokenizer, ) from transformers.models.gemma3.modeling_gemma3 import Gemma3ForCausalLM +from transformers.utils.hub import TRANSFORMERS_CACHE from nemo_rl.algorithms.interfaces import LossFunction, LossType from nemo_rl.algorithms.loss_functions import SequencePackingLossWrapper @@ -170,7 +177,7 @@ def __init__( ) print(f"[Rank {self.rank}] Using FlashAttention2 for sequence packing") - model_config = AutoConfig.from_pretrained( + self.model_config = AutoConfig.from_pretrained( model_name, # Always load the model in float32 to keep master weights in float32. # Keeping the master weights in lower precision has shown to cause issues with convergence. @@ -184,8 +191,11 @@ def __init__( else None, ) + self.model = None + self.use_custom_model = False # Whether config is using custom model inside automodel. + self.allow_flash_attn_args = self.check_model_allow_flash_attn_args( - model_config + self.model_config ) self._is_reward_model = ( @@ -201,7 +211,7 @@ def __init__( rm_type = self.cfg["reward_model_cfg"]["reward_model_type"] if rm_type == "bradley_terry": model_class = NeMoAutoModelForSequenceClassification - if model_config.num_labels != 1: + if self.model_config.num_labels != 1: # For Bradley-Terry reward models, the linear head has a single output. # In the transformers library, the default setting for model_config.num_labels is 2 # (https://github.com/huggingface/transformers/blob/v4.52.4/src/transformers/configuration_utils.py#L259). @@ -214,47 +224,101 @@ def __init__( "model_config.num_labels is not 1. Setting it to 1 since this value is used as the out_features " "for the linear head of Bradley-Terry reward models." ) - model_config.num_labels = 1 + self.model_config.num_labels = 1 else: raise ValueError(f"Unknown reward model type: {rm_type}") else: # DO NOT assume AutoModelForCausalLM, multimodal models can inherit from AutoModelForImageTextToText, AutoModelForTextToWaveform, etc. - model_class = resolve_model_class(model_config.model_type) - - full_state_dict = None - model_state_dict_keys = None - if self.rank == 0: - print(f"[Rank {self.rank}] Loading model {model_name} on CPU...") - model = model_class.from_pretrained( - model_name, - device_map="cpu", # load weights onto CPU initially - trust_remote_code=True, - config=model_config, - use_liger_kernel=False, - torch_dtype=str(model_config.torch_dtype), - ) + if self.cfg["dtensor_cfg"]["model"]["_target_"] is not None: + self.use_custom_model = True + model_class = _resolve_target( + self.cfg["dtensor_cfg"]["model"]["_target_"] + ) + model_cfg = self.cfg["dtensor_cfg"]["model"] + # DeepSeek V3 does not support trust_remote_code=True, so reloading the config. + self.model_config = AutoConfig.from_pretrained( + model_name, + trust_remote_code=False, + attn_implementation="flash_attention_2" + if self.enable_seq_packing + else None, + ) + + if model_cfg.get("num_layers", None) is not None: + self.model_config.num_hidden_layers = model_cfg.get( + "num_layers", None + ) - full_state_dict = model.state_dict() - # Store the original model state dict keys before any parallelization - model_state_dict_keys = list(full_state_dict.keys()) - del model + # Init backend + if model_cfg.get("backend", None) is not None: + backend_class = _resolve_target( + model_cfg.get("backend", None)["_target_"] + ) + backend_kwargs = model_cfg.get("backend") + backend_kwargs.pop("_target_") + backend = backend_class( + **backend_kwargs, + ) + else: + backend = None + + model_kwargs = { + "pretrained_model_name_or_path": self.model_config, + "backend": backend, + } + + self.model = model_class( + **model_kwargs, + ) + print(f"model_class: {model_class}, model type: {type(self.model)}") + + else: + model_class = resolve_model_class(self.model_config.model_type) + print(f"model_class: {model_class}") + + if not self.use_custom_model: + full_state_dict = None + model_state_dict_keys = None + if self.rank == 0: + print(f"[Rank {self.rank}] Loading model {model_name} on CPU...") + model = model_class.from_pretrained( + model_name, + device_map="cpu", # load weights onto CPU initially + trust_remote_code=True, + config=self.model_config, + use_liger_kernel=False, + torch_dtype=str(self.model_config.torch_dtype), + ) + + full_state_dict = model.state_dict() + # Store the original model state dict keys before any parallelization + model_state_dict_keys = list(full_state_dict.keys()) + del model print(f"[Rank {self.rank}] Initializing empty model for FSDP...") # All ranks initialize model on meta device, so FSDP can shard it. # The actual weights will be broadcast from rank 0. - with init_empty_weights(): - # NeMoAutoModelForCausalLM uses flash_attention_2 by default - # so we need to set it to None if sequence packing is disabled - # https://github.com/NVIDIA-NeMo/Automodel/blob/7e748be260651349307862426c0c168cebdeeec3/nemo_automodel/components/_transformers/auto_model.py#L180 - self.model = model_class.from_config( - model_config, - attn_implementation="flash_attention_2" - if self.enable_seq_packing - else None, - use_liger_kernel=False, - trust_remote_code=True, - torch_dtype=str(model_config.torch_dtype), + if not self.use_custom_model: + with init_empty_weights(): + # NeMoAutoModelForCausalLM uses flash_attention_2 by default + # so we need to set it to None if sequence packing is disabled + # https://github.com/NVIDIA-NeMo/Automodel/blob/7e748be260651349307862426c0c168cebdeeec3/nemo_automodel/components/_transformers/auto_model.py#L180 + self.model = model_class.from_config( + self.model_config, + attn_implementation="flash_attention_2" + if self.enable_seq_packing + else None, + use_liger_kernel=False, + trust_remote_code=True, + torch_dtype=str(self.model_config.torch_dtype), + ) + else: + to_empty_parameters_only( + self.model, device=torch.device("cuda"), dtype=self.dtype + ) + self.model.initialize_weights( + buffer_device=torch.device("cuda"), dtype=self.dtype ) if self.model.config.pad_token_id is None: @@ -262,111 +326,89 @@ def __init__( tp_size = self.cfg["dtensor_cfg"]["tensor_parallel_size"] cp_size = self.cfg["dtensor_cfg"]["context_parallel_size"] - if cp_size > 1 and self.enable_seq_packing: - raise ValueError( - "Context parallel is not supported for sequence packing. Refer to https://github.com/NVIDIA/NeMo-RL/blob/main/docs/model-quirks.md#context-parallel-with-fsdp2 for more details." - ) - dp_size = world_size // tp_size // cp_size + ep_size = self.cfg["dtensor_cfg"]["expert_parallel_size"] + pp_size = self.cfg["dtensor_cfg"]["pipeline_parallel_size"] sequence_parallel_enabled = self.cfg["dtensor_cfg"]["sequence_parallel"] - assert world_size == dp_size * tp_size * cp_size, ( - f"World size({world_size}) must equal to dp_size({dp_size}) * tp_size({tp_size}) * cp_size({cp_size}) to use DTensor" - ) - - if sequence_parallel_enabled and tp_size == 1: - print( - "[WARNING]: sequence_parallel=True, but tp_size=1 which has no effect. Enable tp_size > 1 to use sequence parallelism." - ) - - if cp_size > 1: - assert not isinstance(self.model, Gemma3ForCausalLM), ( - "Context parallel is not supported for Gemma3ForCausalLM. Torch context parallel has many limitations. " - "Please refer to https://github.com/NVIDIA/NeMo-RL/blob/main/docs/model-quirks.md#context-parallel-with-fsdp2 for more details." - ) - - assert not (tp_size > 1 and sequence_parallel_enabled), ( - "It's a known issue that context parallel can't be used together with sequence parallel in DTensor worker. " - "Please either set cp_size = 1 or disable sequence parallel. " - "See https://github.com/NVIDIA-NeMo/RL/issues/659 for more details." - ) - - assert not self.is_vlm, ( - "Context parallel is yet not supported for VLM models. Please set cp_size = 1 to train VLM models." - ) # For FSDP2 compatibility, we need to support HSDP structure # For now, we use dp_replicate_size = 1 (no hybrid sharding) dp_replicate_size = 1 - dp_shard_size = dp_size - - # Create device mesh with HSDP structure for FSDP2 compatibility - device_mesh = torch.distributed.device_mesh.init_device_mesh( - "cuda", - (dp_replicate_size, dp_shard_size, cp_size, tp_size), - mesh_dim_names=("dp_replicate", "dp_shard", "cp", "tp"), + self.distributed = self._get_distributed( + dp_replicate_size=1, + tp_size=tp_size, + cp_size=cp_size, + sequence_parallel_enabled=sequence_parallel_enabled, + pp_size=pp_size, + ep_size=ep_size, + world_size=world_size, ) - # Create flattened submeshes for different use cases - # Flatten dp_replicate + dp_shard for the "dp" dimension (backward compatibility) - device_mesh[("dp_replicate", "dp_shard")]._flatten(mesh_dim_name="dp") - - # Flatten dp_shard + cp for FSDP2 sharding - device_mesh[("dp_shard", "cp")]._flatten(mesh_dim_name="dp_shard_cp") - - # Flatten dp_replicate + dp_shard + cp for gradient operations - device_mesh[("dp_replicate", "dp_shard", "cp")]._flatten(mesh_dim_name="dp_cp") - - # Store mesh references for backward compatibility - self.dp_cp_mesh = device_mesh["dp_cp"] - self.dp_mesh = device_mesh["dp"] - self.tp_mesh = device_mesh["tp"] - self.cp_mesh = device_mesh["cp"] - - self.dp_size = dp_size - self.tp_size = tp_size - self.cp_size = cp_size - self.device_mesh = device_mesh + self._setup_parallel( + dp_replicate_size=dp_replicate_size, + tp_size=tp_size, + cp_size=cp_size, + pp_size=pp_size, + ep_size=ep_size, + sequence_parallel_enabled=sequence_parallel_enabled, + ) # ------------------------------------------------ # 3) Move to GPU + Composable FSDP # (Initialize device mesh, shard submodules, then shard entire model) # ------------------------------------------------ - self.model = fsdp2_strategy_parallelize( - self.model, - device_mesh=self.device_mesh, - mp_policy=MixedPrecisionPolicy( - param_dtype=self.dtype, - reduce_dtype=torch.float32, - output_dtype=torch.float32, - ), - offload_policy=CPUOffloadPolicy(pin_memory=False) - if self.cpu_offload - else OffloadPolicy(), - sequence_parallel=sequence_parallel_enabled, - activation_checkpointing=self.cfg["dtensor_cfg"][ - "activation_checkpointing" - ], - tp_shard_plan=self.cfg["dtensor_cfg"]["custom_parallel_plan"], - dp_replicate_mesh_name="dp_replicate", - dp_shard_cp_mesh_name="dp_shard_cp", - tp_mesh_name="tp", - ) + if self.cfg["dtensor_cfg"]["parallelize_fn"] is not None: + parallelize_func = _resolve_target( + self.cfg["dtensor_cfg"]["parallelize_fn"] + ) + parallelize_func( + self.model, + world_mesh=self.distributed.device_mesh, + moe_mesh=self.distributed.moe_mesh, + pp_enabled=False, + dp_axis_names=("dp_shard",), + cp_axis_name="cp", + tp_axis_name="tp", + ep_axis_name="ep", + ep_shard_axis_names=("ep_shard",) if ep_size > 1 else None, + ) + else: + self.model = fsdp2_strategy_parallelize( + self.model, + device_mesh=self.device_mesh, + mp_policy=MixedPrecisionPolicy( + param_dtype=self.dtype, + reduce_dtype=torch.float32, + output_dtype=torch.float32, + ), + offload_policy=CPUOffloadPolicy(pin_memory=False) + if self.cpu_offload + else OffloadPolicy(), + sequence_parallel=sequence_parallel_enabled, + activation_checkpointing=self.cfg["dtensor_cfg"][ + "activation_checkpointing" + ], + tp_shard_plan=self.cfg["dtensor_cfg"]["custom_parallel_plan"], + dp_replicate_mesh_name="dp_replicate", + dp_shard_cp_mesh_name="dp_shard_cp", + tp_mesh_name="tp", + ) print(f"[Rank {self.rank}] Loading state dict from rank 0...") + # This will broadcast the state dict from rank 0 to all other ranks # and load it into the FSDP model. - set_model_state_dict( + load_model_from_base_checkpoint( self.model, - model_state_dict=full_state_dict, - options=StateDictOptions( - full_state_dict=True, - broadcast_from_rank0=True, - ), + torch.cuda.current_device(), + False, + TRANSFORMERS_CACHE, + model_name, + None, + device_mesh=self.distributed.device_mesh, + moe_mesh=self.distributed.moe_mesh, ) - # Broadcast model state dict keys to all ranks and store as instance variable - keys_to_broadcast = [model_state_dict_keys] - torch.distributed.broadcast_object_list(keys_to_broadcast, src=0) - self.model_state_dict_keys = keys_to_broadcast[0] + print(f"[Rank {self.rank}] Loading state dict done.") # Handle tied word embeddings after loading the state dict # We need to actually tie the parameters at the model level @@ -459,9 +501,105 @@ def _apply_temperature_scaling(self, logits: torch.Tensor) -> torch.Tensor: logits.div_(self.cfg["generation"]["temperature"]) return logits - def init_collective( - self, ip: str, port: int, world_size: int, *, train_world_size: int - ) -> None: + def _setup_parallel( + self, + dp_replicate_size: int, + tp_size: int, + cp_size: int, + pp_size: int, + ep_size: int, + sequence_parallel_enabled: bool, + ): + # Setup parallel dims + self.dp_replicate_size = self.distributed.dp_replicate_size + assert self.dp_replicate_size == dp_replicate_size, ( + f"dp_replicate_size does not match, expected {dp_replicate_size} but got {self.dp_replicate_size}" + ) + + # dp_size is inferred from the distributed environment + self.dp_size = self.distributed.dp_size + + self.tp_size = self.distributed.tp_size + assert self.tp_size == tp_size, ( + f"tp_size does not match, expected {tp_size} but got {self.tp_size}" + ) + self.cp_size = self.distributed.cp_size + assert self.cp_size == cp_size, ( + f"cp_size does not match, expected {cp_size} but got {self.cp_size}" + ) + self.pp_size = self.distributed.pp_size + assert self.pp_size == pp_size, ( + f"pp_size does not match, expected {pp_size} but got {self.pp_size}" + ) + self.ep_size = self.distributed.ep_size + assert self.ep_size == ep_size, ( + f"ep_size does not match, expected {ep_size} but got {self.ep_size}" + ) + + self.sequence_parallel_enabled = self.distributed.sequence_parallel + assert self.sequence_parallel_enabled == sequence_parallel_enabled, ( + f"sequence_parallel_enabled does not match, expected {sequence_parallel_enabled} but got {self.sequence_parallel_enabled}" + ) + + # Setup parallel mesh + self.dp_mesh = self.distributed.device_mesh["dp"] + self.tp_mesh = self.distributed.device_mesh["tp"] + self.cp_mesh = self.distributed.device_mesh["cp"] + self.pp_mesh = self.distributed.device_mesh["pp"] + self.ep_mesh = ( + self.distributed.moe_mesh["ep"] if self.distributed.ep_size > 1 else None + ) + self.dp_cp_mesh = self.distributed.device_mesh["dp_shard_cp"] + self.device_mesh = self.distributed.device_mesh + + if self.cp_size > 1 and self.enable_seq_packing: + raise ValueError( + "Context parallel is not supported for sequence packing. Refer to https://github.com/NVIDIA/NeMo-RL/blob/main/docs/model-quirks.md#context-parallel-with-fsdp2 for more details." + ) + + if self.sequence_parallel_enabled and self.tp_size == 1: + print( + "[WARNING]: sequence_parallel=True, but tp_size=1 which has no effect. Enable tp_size > 1 to use sequence parallelism." + ) + + if self.cp_size > 1: + assert not isinstance(self.model, Gemma3ForCausalLM), ( + "Context parallel is not supported for Gemma3ForCausalLM. Torch context parallel has many limitations. " + "Please refer to https://github.com/NVIDIA/NeMo-RL/blob/main/docs/model-quirks.md#context-parallel-with-fsdp2 for more details." + ) + + assert not (self.tp_size > 1 and self.sequence_parallel_enabled), ( + "It's a known issue that context parallel can't be used together with sequence parallel in DTensor worker. " + "Please either set cp_size = 1 or disable sequence parallel. " + "See https://github.com/NVIDIA-NeMo/RL/issues/659 for more details." + ) + + assert not self.is_vlm, ( + "Context parallel is yet not supported for VLM models. Please set cp_size = 1 to train VLM models." + ) + + def _get_distributed( + self, + dp_replicate_size: int, + tp_size: int, + cp_size: int, + sequence_parallel_enabled: bool, + pp_size: int, + ep_size: int, + world_size: int, + ): + return FSDP2Manager( + dp_replicate_size=dp_replicate_size, + tp_size=tp_size, + cp_size=cp_size, + sequence_parallel=sequence_parallel_enabled, + pp_size=pp_size, + ep_size=ep_size, + world_size=world_size, + ) + + def init_collective(self, ip: str, port: int, world_size: int) -> None: + """Initialize the collective communication.""" from vllm.distributed.device_communicators.pynccl import PyNcclCommunicator from vllm.distributed.utils import StatelessProcessGroup diff --git a/nemo_rl/utils/flops_tracker.py b/nemo_rl/utils/flops_tracker.py index 5bf462b2cb..cff615e0fd 100644 --- a/nemo_rl/utils/flops_tracker.py +++ b/nemo_rl/utils/flops_tracker.py @@ -171,7 +171,8 @@ def track(self, n_samples: int, padded_seq_len: int): } # Compute and accumulate flops - flops = self.flops_formula(FLOPSConfig(**config_dict)) + # flops = self.flops_formula(FLOPSConfig(**config_dict)) + flops = 0 self.total_flops += flops def track_batch(self, sequence_lengths: list[int]): diff --git a/nemo_rl/utils/venvs.py b/nemo_rl/utils/venvs.py index 117a54409d..ee687b396d 100644 --- a/nemo_rl/utils/venvs.py +++ b/nemo_rl/utils/venvs.py @@ -33,6 +33,7 @@ def create_local_venv( py_executable: str, venv_name: str, force_rebuild: bool = False ) -> str: + import os """Create a virtual environment using uv and execute a command within it. The output can be used as a py_executable for a Ray worker assuming the worker @@ -77,6 +78,9 @@ def create_local_venv( logger.info(f"Creating new venv at {venv_path}") + # for env_var in os.environ: + # logger.info(f"{env_var}: {os.environ[env_var]}") + # Create the virtual environment uv_venv_cmd = ["uv", "venv", "--allow-existing", venv_path] subprocess.run(uv_venv_cmd, check=True) @@ -88,6 +92,9 @@ def create_local_venv( # context. # https://docs.astral.sh/uv/concepts/projects/config/#project-environment-path env["UV_PROJECT_ENVIRONMENT"] = venv_path + # TODO: joayng WAR for now. + env["CUDA_VISIBLE_DEVICES"] = "0" + logger.info(f"UV_PROJECT_ENVIRONMENT: {env['CUDA_VISIBLE_DEVICES']}") # Split the py_executable into command and arguments exec_cmd = shlex.split(py_executable) diff --git a/pyproject.toml b/pyproject.toml index 36e24a6365..bca4573822 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,6 +63,10 @@ automodel = [ "flash-attn==2.7.4.post1", "mamba-ssm", "causal-conv1d", + "deep_gemm @ git+https://github.com/deepseek-ai/DeepGEMM.git@7b6b5563b9d4c1ae07ffbce7f78ad3ac9204827c", + "deep_ep @ git+https://github.com/deepseek-ai/DeepEP.git@e3908bf5bd0cc6265bcb225d15cd8c996d4759ef", + "grouped_gemm @ git+https://github.com/fanshiqing/grouped_gemm@v1.1.4", + "transformer-engine[pytorch]==2.5.0", ] vllm = [ "cuda-python", @@ -181,7 +185,7 @@ url = "https://download.pytorch.org/whl/cu128" explicit = true [tool.uv] -no-build-isolation-package = ["transformer-engine-torch", "transformer-engine", "flash-attn", "mamba-ssm", "causal-conv1d", "deep_gemm", "deep_ep"] +no-build-isolation-package = ["transformer-engine-torch", "transformer-engine", "flash-attn", "mamba-ssm", "causal-conv1d", "deep_gemm", "deep_ep", "grouped_gemm"] # Always apply the build group since dependencies like TE/mcore/nemo-run require build dependencies # and this lets us assume they are implicitly installed with a simply `uv sync`. Ideally, we'd # avoid including these in the default dependency set, but for now it's required. @@ -192,6 +196,13 @@ default-groups = ["dev", "build"] # --link-mode=symlink (fastest option when uv cache and venv on different file-system; caveat: venv is brittle since it depends on the environment/container) link-mode = "copy" +[tool.uv.extra-build-dependencies] +flash-attn = [{ requirement = "torch", match-runtime = true }] +deep_ep = [{ requirement = "torch", match-runtime = true }] +deep_gemm = [{ requirement = "torch", match-runtime = true }] +#grouped_gemm = [{ requirement = "torch", match-runtime = true }] + + # Needed when building from source [[tool.uv.dependency-metadata]] name = "flash-attn" diff --git a/uv.lock b/uv.lock index f93fc1053e..8495f80c5b 100644 --- a/uv.lock +++ b/uv.lock @@ -1549,6 +1549,17 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/34/b2/056fd4642637cd4627d59ccf2be3f62dd41b8da98e49300eeecd8d4faaa5/grimp-3.9-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:9ddcbfd11d6e6b813121db1116f6b3c4930ab433a949522b5e80542c5da3d805", size = 2092059, upload-time = "2025-05-05T13:46:41.095Z" }, ] +[[package]] +name = "grouped-gemm" +version = "1.1.4" +source = { git = "https://github.com/fanshiqing/grouped_gemm?rev=v1.1.4#172fada89fa7364fe5d026b3a0dfab58b591ffdd" } +dependencies = [ + { name = "absl-py" }, + { name = "numpy" }, + { name = "torch", version = "2.7.1", source = { registry = "https://pypi.org/simple" }, marker = "sys_platform == 'darwin'" }, + { name = "torch", version = "2.7.1+cu128", source = { registry = "https://download.pytorch.org/whl/cu128" }, marker = "sys_platform != 'darwin'" }, +] + [[package]] name = "grpcio" version = "1.74.0" @@ -2936,9 +2947,13 @@ dependencies = [ [package.optional-dependencies] automodel = [ { name = "causal-conv1d" }, + { name = "deep-ep" }, + { name = "deep-gemm" }, { name = "flash-attn" }, + { name = "grouped-gemm" }, { name = "mamba-ssm" }, { name = "nemo-automodel" }, + { name = "transformer-engine", extra = ["pytorch"] }, { name = "vllm" }, ] mcore = [ @@ -3004,11 +3019,14 @@ requires-dist = [ { name = "cuda-python", marker = "extra == 'vllm'" }, { name = "datasets", specifier = ">=4.0.0" }, { name = "debugpy" }, + { name = "deep-ep", marker = "extra == 'automodel'", git = "https://github.com/deepseek-ai/DeepEP.git?rev=e3908bf5bd0cc6265bcb225d15cd8c996d4759ef" }, { name = "deep-ep", marker = "extra == 'vllm'", git = "https://github.com/deepseek-ai/DeepEP.git?rev=e3908bf5bd0cc6265bcb225d15cd8c996d4759ef" }, + { name = "deep-gemm", marker = "extra == 'automodel'", git = "https://github.com/deepseek-ai/DeepGEMM.git?rev=7b6b5563b9d4c1ae07ffbce7f78ad3ac9204827c" }, { name = "deep-gemm", marker = "extra == 'vllm'", git = "https://github.com/deepseek-ai/DeepGEMM.git?rev=7b6b5563b9d4c1ae07ffbce7f78ad3ac9204827c" }, { name = "flash-attn", marker = "extra == 'automodel'", specifier = "==2.7.4.post1" }, { name = "flash-attn", marker = "extra == 'mcore'", specifier = "==2.7.4.post1" }, { name = "flash-attn", marker = "extra == 'vllm'", specifier = "==2.7.4.post1" }, + { name = "grouped-gemm", marker = "extra == 'automodel'", git = "https://github.com/fanshiqing/grouped_gemm?rev=v1.1.4" }, { name = "hydra-core" }, { name = "mamba-ssm", marker = "extra == 'automodel'", git = "https://github.com/state-spaces/mamba.git?rev=2e16fc3062cdcd4ebef27a9aa4442676e1c7edf4" }, { name = "mamba-ssm", marker = "extra == 'vllm'", git = "https://github.com/state-spaces/mamba.git?rev=2e16fc3062cdcd4ebef27a9aa4442676e1c7edf4" }, @@ -3040,6 +3058,7 @@ requires-dist = [ { name = "torchdata" }, { name = "torchvision", marker = "sys_platform != 'darwin'", specifier = ">=0.22.0", index = "https://download.pytorch.org/whl/cu128" }, { name = "torchvision", marker = "sys_platform == 'darwin'", specifier = ">=0.22.0", index = "https://pypi.org/simple" }, + { name = "transformer-engine", extras = ["pytorch"], marker = "extra == 'automodel'", specifier = "==2.5.0" }, { name = "transformer-engine", extras = ["pytorch"], marker = "extra == 'mcore'", specifier = "==2.5.0" }, { name = "transformers", specifier = ">=4.55.4" }, { name = "triton", marker = "sys_platform != 'darwin'", index = "https://download.pytorch.org/whl/cu128" },