Skip to content

Commit e50e391

Browse files
authored
Merge pull request #184 from NVIDIA/trace_collector_to_nvrx_main
NVRX Logger: backtrace indentation fix and trace collector to use nvrx
2 parents b34d626 + ba67445 commit e50e391

File tree

4 files changed

+42
-12
lines changed

4 files changed

+42
-12
lines changed

src/nvidia_resiliency_ext/attribution/trace_analyzer/trace_collector.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@
1313

1414
from nvidia_resiliency_ext.attribution.utils import capture_logs
1515
from nvidia_resiliency_ext.shared_utils.health_check import GPUHealthCheck, NicHealthCheck
16+
from nvidia_resiliency_ext.shared_utils.log_manager import LogConfig
1617

17-
logger = logging.getLogger(__name__)
18+
logger = logging.getLogger(LogConfig.name)
1819

1920

2021
class TraceCollector(ABC):
@@ -65,6 +66,7 @@ def __init__(
6566
self.stack_trace = None
6667
self.dump_fn = torch._C._distributed_c10d._dump_nccl_trace
6768
self.json = json
69+
logger = logging.getLogger(LogConfig.name)
6870
logger.info(f"{self.rank} created TorchFRTraceCollector")
6971

7072
def collect(self):
@@ -112,11 +114,10 @@ def get_health_check_results(local_rank: int):
112114
- Returns the bypassed output strings for GPU and NIC health checks
113115
"""
114116
health_check_results = {}
115-
116-
with capture_logs() as stderr_gpu:
117+
with capture_logs(LogConfig.name) as stderr_gpu:
117118
gpu_health_check = GPUHealthCheck(device_index=local_rank)
118119
gpu_health = gpu_health_check._perform_health_check()
119-
with capture_logs() as stderr_nic:
120+
with capture_logs(LogConfig.name) as stderr_nic:
120121
nic_health_check = NicHealthCheck()
121122
nic_health_check.set_nic_device(local_rank)
122123
nic_health = nic_health_check._perform_health_check()

src/nvidia_resiliency_ext/fault_tolerance/rank_monitor_server.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -539,10 +539,6 @@ def run(
539539

540540
try:
541541
setup_logger(force_reset=True, node_local_tmp_prefix="rankmonsvr")
542-
rmlogger = RankMonitorLogger(
543-
level=cfg.log_level, is_restarter_logger=is_restarter_logger
544-
)
545-
546542
logger = logging.getLogger(LogConfig.name)
547543

548544
logger.debug(f"Starting RankMonitorServer... PID={os.getpid()}")

src/nvidia_resiliency_ext/shared_utils/log_node_local_tmp.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,10 @@ class LogMessage:
154154
def __init__(self, log_message: str):
155155
self.log_message = log_message
156156
self.hash_table = {}
157+
self.log_message_valid = False
157158
match = LogMessage.log_pattern.match(log_message)
158159
if match:
160+
self.log_message_valid = True
159161
log_fields = match.groupdict()
160162
for key, value in log_fields.items():
161163
if key == 'asctime':
@@ -220,7 +222,7 @@ def _write_messages_to_file(self, messages: List[LogMessage], output):
220222
for msg in messages:
221223
try:
222224
# The message is already formatted by the formatter, just write it
223-
output.write(msg.log_message + '\n')
225+
output.write(msg.log_message)
224226
output.flush()
225227
except Exception as e:
226228
# Fallback to stderr if output fails
@@ -366,13 +368,25 @@ def _process_message_file(self, msg_file: str):
366368
return
367369

368370
# Process each line
371+
# Multi-line logs (e.g., tracebacks) have a single header line (matches log_pattern)
372+
# followed by one or more continuation lines. A non-header line is treated as a
373+
# continuation of the previous record, and the entire block is collapsed into a log message.
369374
log_msg_q = queue.SimpleQueue()
375+
old_log_msg: LogMessage = None
370376
for line in lines:
371-
line = line.strip()
372-
if not line:
377+
lineChk = line.strip()
378+
if not lineChk:
373379
continue
374380
log_msg = LogMessage(line)
375-
log_msg_q.put(log_msg)
381+
if log_msg.log_message_valid:
382+
old_log_msg = log_msg
383+
log_msg_q.put(log_msg)
384+
else:
385+
if old_log_msg is not None:
386+
old_log_msg.log_message += line
387+
else:
388+
old_log_msg = log_msg
389+
log_msg_q.put(log_msg)
376390

377391
self._log_dict_queue[msg_file] = log_msg_q
378392

tests/shared_utils/test_logger.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import os
2020
import random
2121
import shutil
22+
import textwrap
2223
import time
2324
import unittest
2425
from datetime import datetime
@@ -75,6 +76,21 @@ def gen_log_msg(logger, num_msg, log_type="info"):
7576
logger.info(f"My Info Logging Message {i}")
7677
if log_type == "debug":
7778
logger.debug(f"My Debug Logging Message {i}")
79+
if log_type == "error":
80+
msg = textwrap.dedent(
81+
"""\
82+
monitor_process.py:316 Traceback (most recent call last):
83+
File "/usr/local/lib/python3.12/dist-packages/nvidia_resiliency_ext/inprocess/monitor_process.py", line 297, in run
84+
store.iteration_barrier(
85+
86+
File "/usr/local/lib/python3.12/dist-packages/nvidia_resiliency_ext/inprocess/store.py", line 303, in reentrant_barrier
87+
self.wait([last_worker_arrived_key], timeout_chunk)
88+
torch.distributed.DistNetworkError: Failed to recv, got 0 bytes. Connection was likely closed. Did the remote server shutdown or crash?
89+
90+
91+
"""
92+
)
93+
logger.error(msg)
7894

7995

8096
def worker_process(id, num_msg, file_size):
@@ -227,6 +243,9 @@ def test_single_msg(self):
227243
num_msg=1, file_size_kb=1024, pm_files=1, is_agg=True, log_type="info", dbg_on="0"
228244
)
229245

246+
def test_traceback_msg(self):
247+
self.check_msg(2, 1024, 1, True, "error", "0")
248+
230249
def test_single_dbg_msg(self):
231250
self.check_msg(
232251
num_msg=1, file_size_kb=1024, pm_files=1, is_agg=True, log_type="debug", dbg_on="1"

0 commit comments

Comments
 (0)