optimize: prewire task dependencies on orchestrator side#1263
optimize: prewire task dependencies on orchestrator side#1263Crane-Liu wants to merge 5 commits into
Conversation
Move selected wiring work to the orchestrator side while preserving scheduler fallback, ready-once routing, and concurrent dummy drain. Co-authored-by: TaoZQY <zhangtaolqy@mail.ustc.edu.cn>
Keep the O-side prewire and ready-once scheduler path while adopting upstream local-buffer removal. Co-authored-by: TaoZQY <zhangtaolqy@mail.ustc.edu.cn>
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis PR reworks the orchestrator/scheduler dependency-wiring pipeline: adding an explicit ChangesInline Wiring, Ready-State, and Wake-List Discovery
Estimated code review effort: 5 (Critical) | ~110 minutes Sequence Diagram(s)sequenceDiagram
participant Caller
participant submit_task_common
participant all_claimed_fanin_completed
participant try_orch_prewire_task
participant route_ready_once
participant WiringQueue
Caller->>submit_task_common: submit task
submit_task_common->>all_claimed_fanin_completed: check claimed producers
alt zero fanin or all completed
submit_task_common->>route_ready_once: route inline-ready
else fanin pending
submit_task_common->>try_orch_prewire_task: attempt prewire (dep_pool_lock)
alt prewire succeeds
try_orch_prewire_task-->>submit_task_common: wired
else prewire fails
submit_task_common->>WiringQueue: push task (spin if full)
end
end
sequenceDiagram
participant Task
participant on_task_complete
participant WakeListState
participant route_ready_once
participant pending_ready
Task->>on_task_complete: task finishes
on_task_complete->>on_task_complete: slot_state.mark_completed()
alt discovery enabled
on_task_complete->>WakeListState: drain_wake_list
WakeListState->>route_ready_once: route woken consumer
route_ready_once->>pending_ready: requeue if fanin missing
end
Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request optimizes the task scheduling and wiring pipeline by allowing the orchestrator to inline already-ready tasks and prewire tasks directly, bypassing the scheduler's wiring queue. To support this, thread-safe locking is added to the dependency pool. Additionally, a disabled-by-default "discovery" wake-up mechanism is introduced to optimize task wake-ups, and dummy ready queue draining is parallelized across multiple scheduler threads. There are no review comments, so I have no feedback to provide.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp`:
- Around line 906-925: Preserve the dep_pool_mark for inline-ready tasks in PTO
orchestrator flow: in the inline-ready branches around route_orch_inline_ready
and try_orch_prewire_task, the slot is being made ready without going through
wire_task(), so the oldest live slot can end up with a zero mark and block
PTO2DepListPool::reclaim(). Update the inline path to assign the same
dep_pool_mark that wire_task() would have established before marking the task
ready, using the existing fanin_builder/current slot state helpers in
pto_orchestrator.cpp so dep-pool tail advancement still works when an inline
task becomes the head of the live window.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 73457185-c6e8-4a0a-b250-b01562f2d41a
📒 Files selected for processing (5)
src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cppsrc/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.hsrc/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.hsrc/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_dispatch.cppsrc/a2a3/runtime/tensormap_and_ringbuffer/runtime/shared/pto_runtime2_init.cpp
Co-authored-by: Crane-Liu <c.wliu@outlook.com>
Move the live-fanin dep_pool wait out of PTO2DepListPool::ensure_space (fast 100k-spin fatal in <1ms) into the orchestrator's orch_wire_live_fanin_task, mirroring the fanin spill pool's ensure_space backstop: an absolute time budget plus a check for a fatal already latched elsewhere. With wiring now on the orchestrator, a workload that stalls the scheduler (e.g. the scheduler_timeout fatal-code test, which lowers PTO2_SCHEDULER_TIMEOUT_MS) used to see O fatal PTO2_ERROR_DEP_POOL_OVERFLOW (-4) at submit time within <1ms, masking the real scheduler-timeout code (-100). Now O bails without latching when sched_error_code / orch_error_code is already set, so the root-cause code surfaces. The residual dep_pool-deadlock backstop is sized to run strictly longer than the scheduler timeout (scheduler timeout + alloc-deadlock slack) so the scheduler's own fault is always observed first, instead of racing a fixed 500ms budget that ties with a 500ms scheduler timeout under load. Applied to a2a3 and a5. Test: tests/st/runtime_fatal_codes (sim, a2a3+a5) — scheduler_timeout surfaces -100, dep_pool_overflow still -4, all cases pass. Co-Authored-By: Claude <noreply@anthropic.com>
Summary
Move dependency wiring to the orchestrator side for the a2a3 and a5
tensormap_and_ringbufferruntimes, and remove the old scheduler-side deferred wiring fallback.The latest cleanup makes this PR a single Orch-side wiring design instead of a mixed O/S fallback path:
Modified Code
a2a3 runtime
src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cppsrc/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.hsrc/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_dispatch.cppsrc/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_cold_path.cppsrc/a2a3/runtime/tensormap_and_ringbuffer/runtime/shared/pto_runtime2_init.cppsrc/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.{cpp,h}src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.hsrc/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.hsrc/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/{scheduler_context.h,scheduler_types.h}a5 runtime
src/a5/runtime/tensormap_and_ringbuffer/runtime/.Shared profiling / tooling
src/a2a3/platform/include/common/l2_swimlane_profiling.hsrc/a5/platform/include/common/l2_swimlane_profiling.hsimpler_setup/tools/swimlane_converter.pyThese were updated so the old scheduler-wire swimlane/profiling label is treated as legacy scheduler wire phase metadata rather than an active deferred wiring phase.
Documentation Updates
Updated both a2a3 and a5 runtime docs:
src/a2a3/runtime/tensormap_and_ringbuffer/docs/RUNTIME_LOGIC.mdsrc/a2a3/runtime/tensormap_and_ringbuffer/docs/MULTI_RING.mdsrc/a2a3/runtime/tensormap_and_ringbuffer/docs/SUBMIT_BY_CLUSTER.mdsrc/a5/runtime/tensormap_and_ringbuffer/docs/RUNTIME_LOGIC.mdsrc/a5/runtime/tensormap_and_ringbuffer/docs/MULTI_RING.mdsrc/a5/runtime/tensormap_and_ringbuffer/docs/SUBMIT_BY_CLUSTER.mdThe docs now describe:
Test Updates
tests/ut/cpp/a2a3/test_wiring.cpptests/ut/cpp/a5/test_wiring.cppUpdated wiring tests to exercise Orch-side helpers directly.
Removed obsolete tests for the deleted SPSC wiring queue:
tests/ut/cpp/a2a3/test_spsc_queue.cpptests/ut/cpp/a5/test_spsc_queue.cppUpdated:
tests/ut/cpp/CMakeLists.txtCurrent Behavior
wfanin == 0: Orch seeds fanin state, records the dep-pool position, and routes the task ready.wfanin > 0and all producers are already complete: Orch seeds fanin/dispatch state and routes the task ready.wfanin > 0with unfinished producers: Orch wires fanout entries through the shared wiring helper before the task becomes visible to schedulers.Results
Previous 100-round Strategy Sweep
Benchmark:
qwen3_14b_decode,StressBatch16Seq3500,a2a3, device 4,tensormap_and_ringbuffer, 100 rounds per run. Values below are trimmed averages, dropping 10 low and 10 high rounds.Negative delta means faster.
simpler-base)Readout:
Latest pypto-lib qwen14b Decode 40L Check
Command:
task-submit --device 0 --max-time 0 --timeout 0 --run 'cd /data/pyptouser/zhangtao/zt/pypto-lb && .venv-bench/bin/python models/qwen3/14b/decode_layer.py -p a2a3 -d $TASK_DEVICE --validate-fwd --fwd-layers 40'Result from
after_lock_cleanup_qwen14b_40l_20260703_162637_dev0.log:device_wallorchschedrunner_runCorrectness:
argmax match 16/16sample match 16/16logits 100.0000% within 5e-2max_abs_err=0.0199Compared with the earlier device-0 baseline average for this 40L check,
device_wallandschedare within about 0.5%, so the cleanup does not show an obvious performance regression.Tests
git diff --checkcmake --build tests/ut/cpp/build -j$(nproc)ctest --test-dir tests/ut/cpp/build --output-on-failure -R '^(test_wiring|test_a2a3_orchestrator_fanin|test_a5_wiring|test_a5_orchestrator_fanin)$'models/qwen3/14b/decode_layer.py -p a2a3 -d $TASK_DEVICE --validate-fwd --fwd-layers 40