diff --git a/trace/analysis/doc/LOG_FORMAT_AND_PARSING.md b/LOG_FORMAT_AND_PARSING.md similarity index 63% rename from trace/analysis/doc/LOG_FORMAT_AND_PARSING.md rename to LOG_FORMAT_AND_PARSING.md index cf67da7..7dfc8ac 100644 --- a/trace/analysis/doc/LOG_FORMAT_AND_PARSING.md +++ b/LOG_FORMAT_AND_PARSING.md @@ -36,11 +36,59 @@ - **格式**:逗号分隔的**字段名**,例如:`TRAIN_JOB_ID,RUNNING_ROUND`。 - **取值规则**: - - **环境变量**:名字在列表中且非“内置计算字段”时,使用 `getenv(名字)`;未设置或空则不输出该段。 - - **内置计算字段**:当前支持 `RUNNING_ROUND`,由 `get_running_round(POD)` 计算(POD 名最后一段,如 `worker-0-8` → `8`),不读环境变量 `RUNNING_ROUND`。 + - **环境变量**:名字未注册为内置计算字段时,使用 `getenv(名字)`;未设置或空则不输出该段。 + - **内置计算字段**:当前支持 `RUNNING_ROUND`,由 resolver 根据 POD 名计算(POD 名最后一段,如 `worker-0-8` → `8`),不读环境变量 `RUNNING_ROUND`。 - **输出顺序**:与配置顺序一致,全部在 `[node_ip] [hostname]` 之前。 -扩展新的“计算字段”时,在 ring_log 写行逻辑里按名字分支,写入对应计算值即可。 +### 1.3.1 扩展内置计算字段(解耦设计) + +计算逻辑已从写行循环中解耦,采用**注册表**方式。新增内置字段时无需修改 `ring_log.cc` 的循环。 + +- **实现位置**:`trace/intercept/extra_field_resolver.h`、`extra_field_resolver.cc` +- **扩展步骤(新增一个内置计算字段,如 `MY_ROUND`)**: + 1. 在 `extra_field_resolver.cc` 中实现 resolver 函数,例如:`std::string resolve_my_round(const ExtraFieldContext& ctx)`,从 `ctx.pod_name`、`ctx.save_iter` 等上下文计算字符串。 + 2. 在 `init_builtin_extra_resolvers()` 中注册:`register_extra_field_resolver("MY_ROUND", resolve_my_round)`。 + 3. 重新编译拦截 .so,并在运行环境中配置 `MEGATRACE_LOG_EXTRA_FIELDS=...,MY_ROUND,...`。 + +- **外部系统集成**:若将 Megatrace 作为库或子模块使用,可在自己的 .cc 中: + 1. `#include "extra_field_resolver.h"` + 2. 在 `log_writer_thread` 启动前调用 `megatrace::init_builtin_extra_resolvers()`(若尚未初始化)。 + 3. 调用 `megatrace::register_extra_field_resolver("FIELD", resolver)` 注册自定义字段。 + +### 1.3.2 配置新字段的完整步骤(内置计算 & 非内置) + +下面给出从零开始“加一个新字段”的完整流程,区分**非内置(环境变量)**与**内置计算字段**两种情况。 + +1. **确定字段名与含义** + - 约定一个不会与现有字段冲突的名字,例如:`TRAIN_JOB_ID`、`CLUSTER_ID`、`MY_ROUND`。 + - 字段值最终会以 `[value]` 形式出现在日志前缀中。 + +2. **非内置计算字段(仅环境变量)的配置步骤** + 1. 在启动脚本 / Pod 配置中设置对应环境变量,例如: + - `export CLUSTER_ID=cluster-a` + 2. 在同一处配置 `MEGATRACE_LOG_EXTRA_FIELDS`,将新字段名加入逗号列表,例如: + - `export MEGATRACE_LOG_EXTRA_FIELDS=TRAIN_JOB_ID,CLUSTER_ID` + 3. 确认日志中前缀形如: + - `[job-xxx] [cluster-a] [node_ip] [hostname] [save_count N] ...` + 4. 若某个环境变量未设置或为空,该字段对应的 `[..]` 不会输出。 + +3. **内置计算字段(需要代码逻辑)的配置步骤** + 1. 在 `extra_field_resolver.cc` 中新增 resolver 函数,例如: + - `std::string resolve_my_round(const ExtraFieldContext& ctx) { /* 基于 ctx.pod_name / ctx.save_iter 等计算 */ }` + 2. 在同文件的 `init_builtin_extra_resolvers()` 中注册: + - `register_extra_field_resolver("MY_ROUND", resolve_my_round);` + 3. 重新编译生成拦截 .so(`nccl_intercept.so` / `rccl_intercept.so`)。 + 4. 在运行环境中设置: + - `export MEGATRACE_LOG_EXTRA_FIELDS=TRAIN_JOB_ID,MY_ROUND,RUNNING_ROUND` + 5. 启动训练后,检查日志行前缀中已出现 `[MY_ROUND 的计算结果]`。 + +4. **混合使用示例(环境变量 + 内置字段)** + - 启动环境: + - `export TRAIN_JOB_ID=job-xxx` + - `export CLUSTER_ID=cluster-a` + - `export MEGATRACE_LOG_EXTRA_FIELDS=TRAIN_JOB_ID,CLUSTER_ID,MY_ROUND,RUNNING_ROUND` + - 日志前缀示例: + - `[job-xxx] [cluster-a] [] [] [node_ip] [hostname] [save_count N] ...` ### 1.4 node_ip 获取方式 @@ -119,6 +167,6 @@ ## 四、扩展与兼容 -- **写端**:在 `MEGATRACE_LOG_EXTRA_FIELDS` 中增加新名字(环境变量或内置计算字段),并保证 **node_ip、hostname 仍为最后两段**,即可扩展前缀,无需改分析。 +- **写端**:在 `MEGATRACE_LOG_EXTRA_FIELDS` 中增加新名字(环境变量或内置计算字段),并保证 **node_ip、hostname 仍为最后两段**,即可扩展前缀,无需改分析。内置计算字段通过 `extra_field_resolver` 注册表扩展,与写行循环解耦。 - **分析端**:不解析、不依赖可选前缀内容;只要“最后两段 + save_count + 消息体”格式不变,解析保持兼容。 - **建议**:自定义前缀字段的值中避免包含 `]`,以免破坏 `[..]` 段解析;若必须包含,需在写端转义或与解析端约定新规则。 diff --git a/trace/analysis/__pycache__/analysis.cpython-313.pyc b/trace/analysis/__pycache__/analysis.cpython-313.pyc index 5a819fe..1b5f3ce 100644 Binary files a/trace/analysis/__pycache__/analysis.cpython-313.pyc and b/trace/analysis/__pycache__/analysis.cpython-313.pyc differ diff --git a/trace/analysis/__pycache__/analysis.cpython-314.pyc b/trace/analysis/__pycache__/analysis.cpython-314.pyc index 3c85b5c..14f28b6 100644 Binary files a/trace/analysis/__pycache__/analysis.cpython-314.pyc and b/trace/analysis/__pycache__/analysis.cpython-314.pyc differ diff --git a/trace/analysis/__pycache__/group_hash_slow_detector.cpython-314.pyc b/trace/analysis/__pycache__/group_hash_slow_detector.cpython-314.pyc index d3b6644..558a742 100644 Binary files a/trace/analysis/__pycache__/group_hash_slow_detector.cpython-314.pyc and b/trace/analysis/__pycache__/group_hash_slow_detector.cpython-314.pyc differ diff --git a/trace/analysis/__pycache__/log_reader.cpython-314.pyc b/trace/analysis/__pycache__/log_reader.cpython-314.pyc index 1d43eaa..5591297 100644 Binary files a/trace/analysis/__pycache__/log_reader.cpython-314.pyc and b/trace/analysis/__pycache__/log_reader.cpython-314.pyc differ diff --git a/trace/intercept/build.sh b/trace/intercept/build.sh index bf72547..8055b08 100644 --- a/trace/intercept/build.sh +++ b/trace/intercept/build.sh @@ -77,9 +77,9 @@ fi # Select source files based on GPU vendor if [ "$GPU_VENDOR" = "nvidia" ] || [ "$GPU_VENDOR" = "cuda" ]; then - SOURCES="nccl_intercept.cc intercept_common.cc ring_log.cc stream_watchdog.cc" + SOURCES="nccl_intercept.cc intercept_common.cc ring_log.cc extra_field_resolver.cc stream_watchdog.cc" elif [ "$GPU_VENDOR" = "amd" ] || [ "$GPU_VENDOR" = "rocm" ] || [ "$GPU_VENDOR" = "hip" ]; then - SOURCES="rccl_intercept.cc intercept_common.cc ring_log.cc" + SOURCES="rccl_intercept.cc intercept_common.cc ring_log.cc extra_field_resolver.cc" else echo "Error: Unknown GPU vendor: $GPU_VENDOR" exit 1 diff --git a/trace/intercept/extra_field_resolver.cc b/trace/intercept/extra_field_resolver.cc new file mode 100644 index 0000000..5f49025 --- /dev/null +++ b/trace/intercept/extra_field_resolver.cc @@ -0,0 +1,53 @@ +#include "extra_field_resolver.h" +#include +#include +#include +#include + +namespace megatrace { + +static std::map* g_resolvers = nullptr; + +static std::map& get_resolvers() { + if (g_resolvers == nullptr) { + g_resolvers = new std::map(); + } + return *g_resolvers; +} + +void register_extra_field_resolver(const std::string& name, ExtraFieldResolver resolver) { + get_resolvers()[name] = resolver; +} + +static std::string resolve_running_round(const ExtraFieldContext& ctx) { + const std::string& pod_name = ctx.pod_name; + int dash_count = std::count(pod_name.begin(), pod_name.end(), '-'); + // example pod name job-34e2a67a-d3d5-43ef-9d3a-07e9986a7355-worker-1-8 + if (dash_count < 8) { + return "0"; + } + size_t last_dash_pos = pod_name.find_last_of('-'); + if (last_dash_pos == std::string::npos) { + return "0"; + } + return pod_name.substr(last_dash_pos + 1); +} + +void init_builtin_extra_resolvers() { + static bool initialized = false; + if (initialized) return; + initialized = true; + register_extra_field_resolver("RUNNING_ROUND", resolve_running_round); +} + +std::string get_extra_field_value(const std::string& name, const ExtraFieldContext& ctx) { + auto& resolvers = get_resolvers(); + auto it = resolvers.find(name); + if (it != resolvers.end()) { + return it->second(ctx); + } + const char* val = getenv(name.c_str()); + return (val && *val != '\0') ? std::string(val) : std::string(); +} + +} // namespace megatrace diff --git a/trace/intercept/extra_field_resolver.h b/trace/intercept/extra_field_resolver.h new file mode 100644 index 0000000..0abe74a --- /dev/null +++ b/trace/intercept/extra_field_resolver.h @@ -0,0 +1,44 @@ +#ifndef MEGATRACE_EXTRA_FIELD_RESOLVER_H +#define MEGATRACE_EXTRA_FIELD_RESOLVER_H + +#include + +namespace megatrace { + +/** + * Context passed to extra field resolvers. + * Extend this struct when adding resolvers that need more context. + */ +struct ExtraFieldContext { + const std::string& pod_name; + int save_iter; +}; + +/** + * Resolver type: (context) -> value string. + * Return empty string to skip output for this field. + */ +using ExtraFieldResolver = std::string (*)(const ExtraFieldContext&); + +/** + * Register a built-in computed field resolver. + * Call before log_writer_thread runs (e.g. at start of log_writer_thread). + * If name is already registered, the new resolver replaces the old one. + */ +void register_extra_field_resolver(const std::string& name, ExtraFieldResolver resolver); + +/** + * Get value for field name: if registered as built-in, call resolver; else use getenv(name). + * Returns value string; empty means skip output for this field. + */ +std::string get_extra_field_value(const std::string& name, const ExtraFieldContext& ctx); + +/** + * Initialize built-in resolvers (RUNNING_ROUND, etc.). + * Called internally by log_writer_thread; external code may call to ensure built-ins exist before adding custom resolvers. + */ +void init_builtin_extra_resolvers(); + +} // namespace megatrace + +#endif /* MEGATRACE_EXTRA_FIELD_RESOLVER_H */ diff --git a/trace/intercept/ring_log.cc b/trace/intercept/ring_log.cc index 17fd310..9b966fd 100644 --- a/trace/intercept/ring_log.cc +++ b/trace/intercept/ring_log.cc @@ -1,6 +1,7 @@ #define MEGA_CC #include "ring_log.h" #include "log.h" +#include "extra_field_resolver.h" #include #include #include @@ -102,19 +103,6 @@ int ring_buffer_pop_batch(ring_buffer_t *rb, log_entry_t *out_entries, int max_e return count; } -std::string get_running_round(const std::string& pod_name) { - int dash_count = std::count(pod_name.begin(), pod_name.end(), '-'); - // example pod name job-34e2a67a-d3d5-43ef-9d3a-07e9986a7355-worker-1-8 - if (dash_count < 8) { - return "0"; - } - size_t last_dash_pos = pod_name.find_last_of('-'); - if (last_dash_pos == std::string::npos) { - return "0"; - } - return pod_name.substr(last_dash_pos + 1); -} - /* * Rotate log files: rename existing files with version suffix and drop the oldest. * filename: current log file (full path) @@ -198,7 +186,9 @@ void *log_writer_thread(void *arg) { if (pod_name == NULL) { pod_name = "unknown"; } - std::string running_round = get_running_round(pod_name); + std::string pod_name_str(pod_name); + + megatrace::init_builtin_extra_resolvers(); /* Prefer MY_POD_IP (K8s); fallback to primary non-loopback IPv4 for Docker / bare metal */ char node_ip_buf[64]; @@ -312,17 +302,12 @@ void *log_writer_thread(void *arg) { save_iter++; LOG_INFO("[save %d] save %d logs",save_iter,num_logs); int n_logs = ring_buffer_pop_batch(&ring_nccl_log, logs, num_logs); + megatrace::ExtraFieldContext ctx{pod_name_str, save_iter}; for (int i = 0; i < n_logs; i++) { - /* Optional extra prefix: from MEGATRACE_LOG_EXTRA_FIELDS; each name is either a built-in computed field or an env var name */ for (const auto& name : extra_field_names) { - const char* val = NULL; - if (name == "RUNNING_ROUND") { - val = running_round.c_str(); - } else { - val = getenv(name.c_str()); - } - if (val && *val != '\0') - fprintf(fp, "[%s] ", val); + std::string val = megatrace::get_extra_field_value(name, ctx); + if (!val.empty()) + fprintf(fp, "[%s] ", val.c_str()); } fprintf(fp, "[%s] [%s] [save_count %d] %s\n", node_ip, hostname, save_iter, logs[i].msg); }