Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,59 @@

- **格式**:逗号分隔的**字段名**,例如:`TRAIN_JOB_ID,RUNNING_ROUND`。
- **取值规则**:
- **环境变量**:名字在列表中且非“内置计算字段”时,使用 `getenv(名字)`;未设置或空则不输出该段。
- **内置计算字段**:当前支持 `RUNNING_ROUND`,由 `get_running_round(POD)` 计算(POD 名最后一段,如 `worker-0-8` → `8`),不读环境变量 `RUNNING_ROUND`。
- **环境变量**:名字未注册为内置计算字段时,使用 `getenv(名字)`;未设置或空则不输出该段。
- **内置计算字段**:当前支持 `RUNNING_ROUND`,由 resolver 根据 POD 名计算(POD 名最后一段,如 `worker-0-8` → `8`),不读环境变量 `RUNNING_ROUND`。
- **输出顺序**:与配置顺序一致,全部在 `[node_ip] [hostname]` 之前。

扩展新的“计算字段”时,在 ring_log 写行逻辑里按名字分支,写入对应计算值即可。
### 1.3.1 扩展内置计算字段(解耦设计)

计算逻辑已从写行循环中解耦,采用**注册表**方式。新增内置字段时无需修改 `ring_log.cc` 的循环。

- **实现位置**:`trace/intercept/extra_field_resolver.h`、`extra_field_resolver.cc`
- **扩展步骤(新增一个内置计算字段,如 `MY_ROUND`)**:
1. 在 `extra_field_resolver.cc` 中实现 resolver 函数,例如:`std::string resolve_my_round(const ExtraFieldContext& ctx)`,从 `ctx.pod_name`、`ctx.save_iter` 等上下文计算字符串。
2. 在 `init_builtin_extra_resolvers()` 中注册:`register_extra_field_resolver("MY_ROUND", resolve_my_round)`。
3. 重新编译拦截 .so,并在运行环境中配置 `MEGATRACE_LOG_EXTRA_FIELDS=...,MY_ROUND,...`。

- **外部系统集成**:若将 Megatrace 作为库或子模块使用,可在自己的 .cc 中:
1. `#include "extra_field_resolver.h"`
2. 在 `log_writer_thread` 启动前调用 `megatrace::init_builtin_extra_resolvers()`(若尚未初始化)。
3. 调用 `megatrace::register_extra_field_resolver("FIELD", resolver)` 注册自定义字段。

### 1.3.2 配置新字段的完整步骤(内置计算 & 非内置)

下面给出从零开始“加一个新字段”的完整流程,区分**非内置(环境变量)**与**内置计算字段**两种情况。

1. **确定字段名与含义**
- 约定一个不会与现有字段冲突的名字,例如:`TRAIN_JOB_ID`、`CLUSTER_ID`、`MY_ROUND`。
- 字段值最终会以 `[value]` 形式出现在日志前缀中。

2. **非内置计算字段(仅环境变量)的配置步骤**
1. 在启动脚本 / Pod 配置中设置对应环境变量,例如:
- `export CLUSTER_ID=cluster-a`
2. 在同一处配置 `MEGATRACE_LOG_EXTRA_FIELDS`,将新字段名加入逗号列表,例如:
- `export MEGATRACE_LOG_EXTRA_FIELDS=TRAIN_JOB_ID,CLUSTER_ID`
3. 确认日志中前缀形如:
- `[job-xxx] [cluster-a] [node_ip] [hostname] [save_count N] ...`
4. 若某个环境变量未设置或为空,该字段对应的 `[..]` 不会输出。

3. **内置计算字段(需要代码逻辑)的配置步骤**
1. 在 `extra_field_resolver.cc` 中新增 resolver 函数,例如:
- `std::string resolve_my_round(const ExtraFieldContext& ctx) { /* 基于 ctx.pod_name / ctx.save_iter 等计算 */ }`
2. 在同文件的 `init_builtin_extra_resolvers()` 中注册:
- `register_extra_field_resolver("MY_ROUND", resolve_my_round);`
3. 重新编译生成拦截 .so(`nccl_intercept.so` / `rccl_intercept.so`)。
4. 在运行环境中设置:
- `export MEGATRACE_LOG_EXTRA_FIELDS=TRAIN_JOB_ID,MY_ROUND,RUNNING_ROUND`
5. 启动训练后,检查日志行前缀中已出现 `[MY_ROUND 的计算结果]`。

4. **混合使用示例(环境变量 + 内置字段)**
- 启动环境:
- `export TRAIN_JOB_ID=job-xxx`
- `export CLUSTER_ID=cluster-a`
- `export MEGATRACE_LOG_EXTRA_FIELDS=TRAIN_JOB_ID,CLUSTER_ID,MY_ROUND,RUNNING_ROUND`
- 日志前缀示例:
- `[job-xxx] [cluster-a] [<MY_ROUND>] [<RUNNING_ROUND>] [node_ip] [hostname] [save_count N] ...`

### 1.4 node_ip 获取方式

Expand Down Expand Up @@ -119,6 +167,6 @@

## 四、扩展与兼容

- **写端**:在 `MEGATRACE_LOG_EXTRA_FIELDS` 中增加新名字(环境变量或内置计算字段),并保证 **node_ip、hostname 仍为最后两段**,即可扩展前缀,无需改分析。
- **写端**:在 `MEGATRACE_LOG_EXTRA_FIELDS` 中增加新名字(环境变量或内置计算字段),并保证 **node_ip、hostname 仍为最后两段**,即可扩展前缀,无需改分析。内置计算字段通过 `extra_field_resolver` 注册表扩展,与写行循环解耦。
- **分析端**:不解析、不依赖可选前缀内容;只要“最后两段 + save_count + 消息体”格式不变,解析保持兼容。
- **建议**:自定义前缀字段的值中避免包含 `]`,以免破坏 `[..]` 段解析;若必须包含,需在写端转义或与解析端约定新规则。
Binary file modified trace/analysis/__pycache__/analysis.cpython-313.pyc
Binary file not shown.
Binary file modified trace/analysis/__pycache__/analysis.cpython-314.pyc
Binary file not shown.
Binary file not shown.
Binary file modified trace/analysis/__pycache__/log_reader.cpython-314.pyc
Binary file not shown.
4 changes: 2 additions & 2 deletions trace/intercept/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ fi

# Select source files based on GPU vendor
if [ "$GPU_VENDOR" = "nvidia" ] || [ "$GPU_VENDOR" = "cuda" ]; then
SOURCES="nccl_intercept.cc intercept_common.cc ring_log.cc stream_watchdog.cc"
SOURCES="nccl_intercept.cc intercept_common.cc ring_log.cc extra_field_resolver.cc stream_watchdog.cc"
elif [ "$GPU_VENDOR" = "amd" ] || [ "$GPU_VENDOR" = "rocm" ] || [ "$GPU_VENDOR" = "hip" ]; then
SOURCES="rccl_intercept.cc intercept_common.cc ring_log.cc"
SOURCES="rccl_intercept.cc intercept_common.cc ring_log.cc extra_field_resolver.cc"
else
echo "Error: Unknown GPU vendor: $GPU_VENDOR"
exit 1
Expand Down
53 changes: 53 additions & 0 deletions trace/intercept/extra_field_resolver.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#include "extra_field_resolver.h"
#include <stdlib.h>
#include <cstring>
#include <algorithm>
#include <map>

namespace megatrace {

static std::map<std::string, ExtraFieldResolver>* g_resolvers = nullptr;

static std::map<std::string, ExtraFieldResolver>& get_resolvers() {
if (g_resolvers == nullptr) {
g_resolvers = new std::map<std::string, ExtraFieldResolver>();
}
return *g_resolvers;
}

void register_extra_field_resolver(const std::string& name, ExtraFieldResolver resolver) {
get_resolvers()[name] = resolver;
}

static std::string resolve_running_round(const ExtraFieldContext& ctx) {
const std::string& pod_name = ctx.pod_name;
int dash_count = std::count(pod_name.begin(), pod_name.end(), '-');
// example pod name job-34e2a67a-d3d5-43ef-9d3a-07e9986a7355-worker-1-8
if (dash_count < 8) {
return "0";
}
size_t last_dash_pos = pod_name.find_last_of('-');
if (last_dash_pos == std::string::npos) {
return "0";
}
return pod_name.substr(last_dash_pos + 1);
}

void init_builtin_extra_resolvers() {
static bool initialized = false;
if (initialized) return;
initialized = true;
register_extra_field_resolver("RUNNING_ROUND", resolve_running_round);
}

std::string get_extra_field_value(const std::string& name, const ExtraFieldContext& ctx) {
auto& resolvers = get_resolvers();
auto it = resolvers.find(name);
if (it != resolvers.end()) {
return it->second(ctx);
}
const char* val = getenv(name.c_str());
return (val && *val != '\0') ? std::string(val) : std::string();
}

} // namespace megatrace
44 changes: 44 additions & 0 deletions trace/intercept/extra_field_resolver.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#ifndef MEGATRACE_EXTRA_FIELD_RESOLVER_H
#define MEGATRACE_EXTRA_FIELD_RESOLVER_H

#include <string>

namespace megatrace {

/**
* Context passed to extra field resolvers.
* Extend this struct when adding resolvers that need more context.
*/
struct ExtraFieldContext {
const std::string& pod_name;
int save_iter;
};

/**
* Resolver type: (context) -> value string.
* Return empty string to skip output for this field.
*/
using ExtraFieldResolver = std::string (*)(const ExtraFieldContext&);

/**
* Register a built-in computed field resolver.
* Call before log_writer_thread runs (e.g. at start of log_writer_thread).
* If name is already registered, the new resolver replaces the old one.
*/
void register_extra_field_resolver(const std::string& name, ExtraFieldResolver resolver);

/**
* Get value for field name: if registered as built-in, call resolver; else use getenv(name).
* Returns value string; empty means skip output for this field.
*/
std::string get_extra_field_value(const std::string& name, const ExtraFieldContext& ctx);

/**
* Initialize built-in resolvers (RUNNING_ROUND, etc.).
* Called internally by log_writer_thread; external code may call to ensure built-ins exist before adding custom resolvers.
*/
void init_builtin_extra_resolvers();

} // namespace megatrace

#endif /* MEGATRACE_EXTRA_FIELD_RESOLVER_H */
31 changes: 8 additions & 23 deletions trace/intercept/ring_log.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#define MEGA_CC
#include "ring_log.h"
#include "log.h"
#include "extra_field_resolver.h"
#include <sys/un.h>
#include <utime.h>
#include <iostream>
Expand Down Expand Up @@ -102,19 +103,6 @@ int ring_buffer_pop_batch(ring_buffer_t *rb, log_entry_t *out_entries, int max_e
return count;
}

std::string get_running_round(const std::string& pod_name) {
int dash_count = std::count(pod_name.begin(), pod_name.end(), '-');
// example pod name job-34e2a67a-d3d5-43ef-9d3a-07e9986a7355-worker-1-8
if (dash_count < 8) {
return "0";
}
size_t last_dash_pos = pod_name.find_last_of('-');
if (last_dash_pos == std::string::npos) {
return "0";
}
return pod_name.substr(last_dash_pos + 1);
}

/*
* Rotate log files: rename existing files with version suffix and drop the oldest.
* filename: current log file (full path)
Expand Down Expand Up @@ -198,7 +186,9 @@ void *log_writer_thread(void *arg) {
if (pod_name == NULL) {
pod_name = "unknown";
}
std::string running_round = get_running_round(pod_name);
std::string pod_name_str(pod_name);

megatrace::init_builtin_extra_resolvers();

/* Prefer MY_POD_IP (K8s); fallback to primary non-loopback IPv4 for Docker / bare metal */
char node_ip_buf[64];
Expand Down Expand Up @@ -312,17 +302,12 @@ void *log_writer_thread(void *arg) {
save_iter++;
LOG_INFO("[save %d] save %d logs",save_iter,num_logs);
int n_logs = ring_buffer_pop_batch(&ring_nccl_log, logs, num_logs);
megatrace::ExtraFieldContext ctx{pod_name_str, save_iter};
for (int i = 0; i < n_logs; i++) {
/* Optional extra prefix: from MEGATRACE_LOG_EXTRA_FIELDS; each name is either a built-in computed field or an env var name */
for (const auto& name : extra_field_names) {
const char* val = NULL;
if (name == "RUNNING_ROUND") {
val = running_round.c_str();
} else {
val = getenv(name.c_str());
}
if (val && *val != '\0')
fprintf(fp, "[%s] ", val);
std::string val = megatrace::get_extra_field_value(name, ctx);
if (!val.empty())
fprintf(fp, "[%s] ", val.c_str());
}
fprintf(fp, "[%s] [%s] [save_count %d] %s\n", node_ip, hostname, save_iter, logs[i].msg);
}
Expand Down