Refactor: move fanout wiring to orchestrator, drop wiring queue#1264
Refactor: move fanout wiring to orchestrator, drop wiring queue#1264ChaoWao wants to merge 1 commit into
Conversation
…ueue The orchestrator now wires each task's fanout edges inline at submit time (dep_pool.ensure_space + wire_task) instead of pushing to a deferred wiring queue drained by scheduler thread 0. The whole wiring-queue subsystem is deleted: PTO2SpscQueue, WiringState, drain_wiring_queue, report_wiring_deadlock, producer_blocked, orch_needs_drain, dep_deadlock_reported, and the Wire swimlane phase. - dep_pool becomes orchestrator-exclusive (single writer), so it needs no lock; the only cross-thread serialization is the existing per-producer fanout_lock, shared with the scheduler's completion handler. - Forward-progress safety moves from the S-side structural wiring-deadlock detector to PTO2DepListPool::ensure_space's existing no-progress overflow detector (same PTO2_ERROR_DEP_POOL_OVERFLOW fatal), mirroring the fanin_pool.ensure_space already used on the submit path. - wait_for_tensor_ready's orch_needs_drain signal is removed: with O wiring every task at submit time, the producer it waits on is already wired, and the scheduler dispatch loop is a busy poll that needs no "drain now" nudge. Docs (RUNTIME_LOGIC.md, MULTI_RING.md, SUBMIT_BY_CLUSTER.md) updated to the O-side wiring model. test_spsc_queue.cpp (a2a3) removed; the drain_wiring_queue cases dropped from test_wiring.cpp (wire_task cases retained). Testing: a2a3sim tensormap_and_ringbuffer suite — 30 passed, 1 skipped.
📝 WalkthroughWalkthroughThis PR removes the scheduler's SPSC-based wiring queue subsystem and moves fanout wiring inline into the orchestrator's task submission path via a direct ChangesInline Fanout Wiring Replaces Scheduler Wiring Queue
Estimated code review effort: 4 (Complex) | ~60 minutes Sequence Diagram(s)sequenceDiagram
participant Orchestrator as PTO2OrchestratorState
participant DepPool as dep_pool
participant Scheduler as sched (wire_task)
participant ReadyQueue as push_ready_routed
Orchestrator->>DepPool: ensure_space(fanin_actual_count)
DepPool-->>Orchestrator: PTO2_ERROR_DEP_POOL_OVERFLOW (fatal on failure)
Orchestrator->>Scheduler: wire_task(rss, slot_state, wfanin)
Scheduler->>ReadyQueue: push_ready_routed(slot) when ready
Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request refactors the fanout wiring mechanism by removing the deferred SPSC wiring queue and performing fanout wiring inline on the orchestrator side at submit time. This change simplifies the scheduler by removing the wire phase, the PTO2SpscQueue structure, and its associated tests. Feedback on the changes highlights two high-severity issues where signed integer parameters (wfanin and dep_pool_capacities[r]) are used in memory operations or cast to size_t without validating that they are non-negative, which could lead to out-of-bounds writes or signed-to-unsigned conversion overflows.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| int32_t wfanin = payload.fanin_actual_count; | ||
| if (wfanin > 0 && !rss.dep_pool.ensure_space(*rss.ring, wfanin)) { | ||
| orch_mark_fatal(orch, PTO2_ERROR_DEP_POOL_OVERFLOW); | ||
| return result; | ||
| } | ||
| sched->wiring.producer_blocked.store(0, std::memory_order_release); | ||
| sched->wire_task(rss, &cur_slot_state, wfanin); |
There was a problem hiding this comment.
The signed 32-bit integer wfanin (derived from payload.fanin_actual_count) is used as a count parameter for ensure_space and wire_task without validation that it is non-negative. If wfanin is negative, the wfanin > 0 check is bypassed, but wire_task is still called with a negative value. Inside wire_task, wfanin != 0 is true, which will attempt to allocate entries from dep_pool via dep_pool.prepend without having reserved space, potentially causing dependency pool corruption or out-of-bounds writes.
Per defensive programming guidelines, always validate that signed count parameters are non-negative before using them in bounds checks or memory operations.
int32_t wfanin = payload.fanin_actual_count;
always_assert(wfanin >= 0);
if (wfanin > 0 && !rss.dep_pool.ensure_space(*rss.ring, wfanin)) {
orch_mark_fatal(orch, PTO2_ERROR_DEP_POOL_OVERFLOW);
return result;
}
sched->wire_task(rss, &cur_slot_state, wfanin);References
- When using a signed integer for a count or size parameter in functions that perform bounds checks and memory operations (such as memcpy or array indexing), always validate that the count is non-negative (e.g., count >= 0) to prevent bypassing bounds checks and causing out-of-bounds writes or signed-to-unsigned conversion overflows.
| layout.off_dep_pool_entries[r] = | ||
| arena.reserve(static_cast<size_t>(dep_pool_capacities[r]) * sizeof(PTO2DepListEntry), PTO2_ALIGN_SIZE); |
There was a problem hiding this comment.
The signed 32-bit integer dep_pool_capacities[r] is cast to size_t and used as a size parameter in arena.reserve without validating that it is non-negative. If dep_pool_capacities[r] is negative, casting it to size_t will cause a signed-to-unsigned conversion overflow, resulting in an extremely large allocation request or integer overflow in the multiplication, potentially bypassing bounds checks and causing heap corruption or out-of-bounds writes.
Per defensive programming guidelines, always validate that signed count/size parameters are non-negative before using them in memory operations.
always_assert(dep_pool_capacities[r] >= 0);
layout.off_dep_pool_entries[r] =
arena.reserve(static_cast<size_t>(dep_pool_capacities[r]) * sizeof(PTO2DepListEntry), PTO2_ALIGN_SIZE);References
- When using a signed integer for a count or size parameter in functions that perform bounds checks and memory operations (such as memcpy or array indexing), always validate that the count is non-negative (e.g., count >= 0) to prevent bypassing bounds checks and causing out-of-bounds writes or signed-to-unsigned conversion overflows.
There was a problem hiding this comment.
🧹 Nitpick comments (2)
src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp (1)
464-471: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winStale comment: "fanin_count is set by scheduler during wiring".
Wiring is now performed inline by the orchestrator (per the updated comments immediately above at Lines 448-455 and the Step 6 rewrite at Lines 868-884), not deferred to a scheduler thread. This leftover comment on Line 470 contradicts the new model and should be updated for consistency.
📝 Proposed fix
- // fanin_count is set by scheduler during wiring + // fanin_count is set by wire_task during inline orchestrator-side wiring🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp` around lines 464 - 471, The comment on the slot setup in pto_orchestrator.cpp is stale and contradicts the current inline wiring flow. Update the note near out->slot_state and scope_tasks_push in the orchestrator path to reflect that wiring now happens inside the orchestrator (consistent with the comments around the earlier setup and the Step 6 rewrite), and remove any mention that fanin_count is set by a scheduler during wiring.src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h (1)
1057-1061: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winStale comment: "wiring SPSC buffer" no longer exists.
reserve_layout's Phase 1 doc still lists "wiring SPSC buffer" as one of the sub-regions declared, but this PR removes that subsystem entirely (perPTO2SchedulerLayoutat Line 404-411). This leftover reference will mislead future readers about whatreserve_layoutactually reserves.📝 Proposed fix
- // Phase 1: declare every sub-region (ready_queue slots, dummy queue slots, - // per-ring dep_pool entries, wiring SPSC buffer) on the supplied arena. + // Phase 1: declare every sub-region (ready_queue slots, dummy queue slots, + // per-ring dep_pool entries) on the supplied arena.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h` around lines 1057 - 1061, The Phase 1 comment for reserve_layout is stale because it still mentions the removed wiring SPSC buffer even though PTO2SchedulerLayout no longer includes that subsystem. Update the doc comment near reserve_layout to describe only the regions that are actually reserved now (for example ready_queue slots, dummy queue slots, and per-ring dep_pool entries) and remove any reference to wiring SPSC buffer so the comment matches the current layout.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp`:
- Around line 464-471: The comment on the slot setup in pto_orchestrator.cpp is
stale and contradicts the current inline wiring flow. Update the note near
out->slot_state and scope_tasks_push in the orchestrator path to reflect that
wiring now happens inside the orchestrator (consistent with the comments around
the earlier setup and the Step 6 rewrite), and remove any mention that
fanin_count is set by a scheduler during wiring.
In `@src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h`:
- Around line 1057-1061: The Phase 1 comment for reserve_layout is stale because
it still mentions the removed wiring SPSC buffer even though PTO2SchedulerLayout
no longer includes that subsystem. Update the doc comment near reserve_layout to
describe only the regions that are actually reserved now (for example
ready_queue slots, dummy queue slots, and per-ring dep_pool entries) and remove
any reference to wiring SPSC buffer so the comment matches the current layout.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 993e3b22-c49a-49f4-a8cd-fb7a4e217c49
📒 Files selected for processing (19)
simpler_setup/tools/swimlane_converter.pysrc/a2a3/platform/include/common/l2_swimlane_profiling.hsrc/a2a3/platform/shared/host/l2_swimlane_collector.cppsrc/a2a3/runtime/tensormap_and_ringbuffer/docs/MULTI_RING.mdsrc/a2a3/runtime/tensormap_and_ringbuffer/docs/RUNTIME_LOGIC.mdsrc/a2a3/runtime/tensormap_and_ringbuffer/docs/SUBMIT_BY_CLUSTER.mdsrc/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cppsrc/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cppsrc/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.hsrc/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.hsrc/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.hsrc/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_cold_path.cppsrc/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_context.hsrc/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_dispatch.cppsrc/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_types.hsrc/a2a3/runtime/tensormap_and_ringbuffer/runtime/shared/pto_runtime2_init.cpptests/ut/cpp/CMakeLists.txttests/ut/cpp/a2a3/test_spsc_queue.cpptests/ut/cpp/a2a3/test_wiring.cpp
💤 Files with no reviewable changes (9)
- src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h
- tests/ut/cpp/a2a3/test_spsc_queue.cpp
- src/a2a3/platform/include/common/l2_swimlane_profiling.h
- src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_types.h
- tests/ut/cpp/CMakeLists.txt
- src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp
- src/a2a3/platform/shared/host/l2_swimlane_collector.cpp
- src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_dispatch.cpp
- tests/ut/cpp/a2a3/test_wiring.cpp
Summary
Moves all task fanout wiring from the scheduler's deferred wiring queue onto the orchestrator, and deletes the wiring-queue subsystem entirely.
dep_pool.ensure_space(block / reclaim / overflow-detect) →wire_task. This mirrors thefanin_pool.ensure_spacealready on the submit path.dep_poolbecomes orchestrator-exclusive (single writer) → no lock needed. Thedep_pool_lockadded in optimize: prewire task dependencies on orchestrator side #1263 was only required by the hybrid O+S design; a single-wirer model is lock-free. The only cross-thread serialization remains the per-producerfanout_lock, shared with the scheduler's completion handler.PTO2SpscQueue,WiringState,drain_wiring_queue,report_wiring_deadlock,producer_blocked,orch_needs_drain,dep_deadlock_reported, and the Wire swimlane phase.PTO2DepListPool::ensure_space's existing no-progress overflow detector (samePTO2_ERROR_DEP_POOL_OVERFLOWfatal).Why
A single wirer gives a simpler model — lock-free dep_pool, no SPSC queue, no batch/backoff machinery. The benchmark below tests whether the wiring queue's batching was still pulling its weight.
Measured impact (a2a3 onboard, device 2, 100 rounds, trimmed mean; vs mainline
c4cfc968)Neutral on S-bound workloads (incl. qwen3); regresses O-bound ones (where
Orch ≈ Effective) by 4.6–9.4%. Orch rises everywhere because the wiring work shifts S→O; where the orchestrator was already the bottleneck, that increase lands directly on Effective. The wiring queue's batching was load-bearing for those cases.This is opened to get CI/review input on the full-removal approach versus the hybrid in #1263 (keep the queue + add O-side fast paths), which avoids these regressions. It is not a clear net win as-is.
Testing
tensormap_and_ringbuffersuite — 30 passed, 1 skipped.test_spsc_queue.cpp(a2a3) removed, drain cases dropped fromtest_wiring.cpp(wire_task cases retained).Known caveat
PTO2_SERIAL_ORCH_SCHED=1(default-off, debug-only) now requires a graph's in-flight fanout to fit in dep_pool, since no task completes during orchestration to reclaim it. Documented inRUNTIME_LOGIC.md.