Skip to content

Commit 150abf1

Browse files
CodeCasterXclaude
andcommitted
fix: 修复 Fork.join() 并发场景下的 null 数据处理 (#247)
问题根因:在并发场景下,Fork.join() 的 reducer 接收到 input.getData() = null, 导致 NPE 或数据丢失("Required parameters are missing")。 修复方案(阶段1): - Fork.java: 添加智能 null 处理,跳过 null 分支避免崩溃 - 使用 Logger.warn() 记录异常情况,便于监控 - 清理所有 System.err 诊断代码 - Tip.merge(): 保留防御性 null 检查 技术细节: - 当 inputData 为 null 时,记录警告日志并跳过此分支 - 如果是最后一个分支,返回已有数据(避免整个流程失败) - 保留 Tip.merge() 的 null 检查作为额外防御层 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent a8d9b99 commit 150abf1

File tree

3 files changed

+22
-51
lines changed
  • framework
    • fel/java
      • fel-core/src/main/java/modelengine/fel/core/util
      • fel-flow/src/main/java/modelengine/fel/engine/activities
    • waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states

3 files changed

+22
-51
lines changed

framework/fel/java/fel-core/src/main/java/modelengine/fel/core/util/Tip.java

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -118,31 +118,11 @@ public Tip addAll(Map<String, Content> args) {
118118
* @return 表示当前的 {@link Tip}。
119119
*/
120120
public Tip merge(Tip other) {
121-
// === DIAGNOSTIC #5: Tip.merge() 开始 ===
122-
System.err.println(String.format(
123-
"[DIAG-Tip.merge-START] thread=%s, this=%s, other=%s, other_is_null=%b",
124-
Thread.currentThread().getName(), this, other, (other == null)
125-
));
126-
127-
// 防御性处理:在并发场景下,Fork.join() 可能传入 null
128-
// 参考:https://github.com/ModelEngine-Group/fit-framework/issues/247
121+
// Issue #247: 防御性处理,在并发场景下 Fork.join() 可能传入 null
129122
if (other == null) {
130-
System.err.println(String.format(
131-
"[DIAG-Tip.merge-NULL] thread=%s, other is null, returning this=%s",
132-
Thread.currentThread().getName(), this
133-
));
134123
return this;
135124
}
136-
137-
Tip result = this.addAll(other.values);
138-
139-
// === DIAGNOSTIC #6: Tip.merge() 结束 ===
140-
System.err.println(String.format(
141-
"[DIAG-Tip.merge-END] thread=%s, result=%s",
142-
Thread.currentThread().getName(), result
143-
));
144-
145-
return result;
125+
return this.addAll(other.values);
146126
}
147127

148128
/**

framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiStart.java

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -594,23 +594,7 @@ public final AiState<Tip, D, Tip, RF, F> runnableParallel(Pattern<O, Tip>... pat
594594
.orElseGet(() -> new AiParallel<>(this.start.parallel(), mineFlow).fork(branchProcessor));
595595
}
596596

597-
AiState<Tip, D, Tip, RF, F> state = aiFork.join(Tip::new, (acc, data) -> {
598-
// === DIAGNOSTIC #3: AiStart reducer 调用 merge 之前 ===
599-
System.err.println(String.format(
600-
"[DIAG-AiStart:605-BEFORE] thread=%s, acc=%s, data=%s, data_is_null=%b",
601-
Thread.currentThread().getName(), acc, data, (data == null)
602-
));
603-
604-
Tip mergeResult = acc.merge(data); // Tip.merge() 内部会处理 null
605-
606-
// === DIAGNOSTIC #4: AiStart reducer 调用 merge 之后 ===
607-
System.err.println(String.format(
608-
"[DIAG-AiStart:605-AFTER] thread=%s, mergeResult=%s",
609-
Thread.currentThread().getName(), mergeResult
610-
));
611-
612-
return mergeResult;
613-
});
597+
AiState<Tip, D, Tip, RF, F> state = aiFork.join(Tip::new, (acc, data) -> acc.merge(data));
614598
((Processor<?, ?>) state.publisher()).displayAs("runnableParallel");
615599
return state;
616600
}

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/Fork.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import modelengine.fit.waterflow.domain.stream.reactive.Processor;
1313
import modelengine.fit.waterflow.domain.stream.reactive.Publisher;
1414
import modelengine.fit.waterflow.domain.utils.Tuple;
15+
import modelengine.fitframework.log.Logger;
1516
import modelengine.fitframework.util.ObjectUtils;
1617

1718
import java.util.ArrayList;
@@ -33,6 +34,8 @@
3334
* @since 1.0
3435
*/
3536
public class Fork<O, D, I, F extends Flow<D>> extends Activity<D, F> {
37+
private static final Logger LOG = Logger.get(Fork.class);
38+
3639
private final State<I, D, I, F> node;
3740

3841
private final List<State<O, D, ?, F>> forks = new ArrayList<>();
@@ -94,21 +97,25 @@ public synchronized R process(FlowContext<O> input) {
9497
}
9598
}
9699

97-
// === DIAGNOSTIC #1: Fork.join wrapper 调用 processor 之前 ===
100+
// Issue #247: 智能处理并发场景下的 null 数据
101+
// 在某些竞态条件下,FlowContext.data 可能为 null
98102
O inputData = input.getData();
99-
System.err.println(String.format(
100-
"[DIAG-Fork:96-BEFORE] key=%s, thread=%s, branchCount=%d/%d, acc.first=%s, input.getData=%s, input.getData_is_null=%b",
101-
key, Thread.currentThread().getName(), acc.second(), forkNumber.get(),
102-
acc.first(), inputData, (inputData == null)
103-
));
103+
if (inputData == null) {
104+
LOG.warn("[Fork.join] Received null FlowContext.data. "
105+
+ "key={}, session={}, thread={}, branch={}/{}, acc={}",
106+
key, input.getSession().getId(), Thread.currentThread().getName(),
107+
acc.second() + 1, forkNumber.get(), acc.first());
104108

105-
R processedResult = processor.process(acc.first(), inputData);
109+
// 跳过此分支,不更新累加器
110+
// 如果是最后一个分支,返回已有数据(避免整个流程失败)
111+
if (acc.second() + 1 == forkNumber.get()) {
112+
accs.remove(key);
113+
return acc.first();
114+
}
115+
return null;
116+
}
106117

107-
// === DIAGNOSTIC #2: Fork.join wrapper 调用 processor 之后 ===
108-
System.err.println(String.format(
109-
"[DIAG-Fork:96-AFTER] key=%s, thread=%s, processedResult=%s, processedResult_is_null=%b",
110-
key, Thread.currentThread().getName(), processedResult, (processedResult == null)
111-
));
118+
R processedResult = processor.process(acc.first(), inputData);
112119

113120
acc = Tuple.from(processedResult, acc.second() + 1);
114121
accs.put(key, acc);

0 commit comments

Comments
 (0)