Skip to content

Latest commit

 

History

History
406 lines (331 loc) · 14.2 KB

File metadata and controls

406 lines (331 loc) · 14.2 KB

Scheduler — DAG Dispatch Internals

The Scheduler is the DAG executor. A dedicated C++ thread that consumes submitted slots, wires fanout edges, dispatches ready tasks to worker threads, and handles completion callbacks. It is the bridge between the Orchestrator (producer of DAG nodes) and the WorkerManager (consumer of ready nodes).

For the high-level role of the Scheduler among the three engine components, see hierarchical_level_runtime.md. For the DAG construction side (what feeds the Scheduler), see orchestrator.md. For dispatch mechanics (how WorkerThread::dispatch actually runs a task), see worker-manager.md.


1. Role

The Scheduler's job:

  • Drain the wiring queue (Phase 0): wire fanout edges for newly submitted slots; if all producers are already done, promote to the ready queue.
  • Drain the ready queue (Phase 1): for each ready slot, pick an idle WorkerThread from the appropriate pool and hand off.
  • Drain the completion queue (Phase 2): for each worker completion, transition the slot to COMPLETED or FAILED, release fanout references, wake or poison downstream consumers, and (if all refs released) retire the ring slot.

One Scheduler per Worker instance, one thread per Scheduler. The Scheduler does not inspect task data — it moves slot ids between queues and consults scheduling metadata (fanin_count, fanout_consumers, state).


2. The queues

class Scheduler {
    // Producer: Orchestrator.submit_*. Consumer: Scheduler's own loop, Phase 0.
    LockFreeQueue<WiringEntry> wiring_queue_;       // {slot, producers}

    // Strict-4 — per-worker-type ready queues.
    // Producers: Orchestrator.submit_* (routes by slot.worker_type) +
    //            Scheduler Phase 0 / Phase 2 (fanout-released; routes by
    //            consumer worker_type).
    // Consumer: Scheduler's own loop, Phase 1 (one drain loop per queue,
    //           each with its own head-of-line break).
    ReadyQueue *ready_next_level_queue_;
    ReadyQueue *ready_sub_queue_;

    // Producer: WorkerThread (on endpoint->run() return).
    // Consumer: Scheduler's own loop, Phase 2.
    std::queue<WorkerCompletion> completion_queue_;
};

Wiring queue

Introduced so that Orchestrator::submit_* does not need to acquire fanout_mu on every producer slot at submit time (see orchestrator.md §2 step 6).

Each entry:

struct WiringEntry {
    TaskSlot consumer;
    std::vector<TaskSlot> producers;    // producers this consumer depends on
};

Ready queue

Slots whose fanin_count == fanin_released are ready to dispatch. The queue holds just the slot id; dispatch reads task data from the ring.slot_state(sid) pool.

Strict-4 — per-worker-type split. In practice the ready queue is two ReadyQueue instances, one per WorkerType:

ReadyQueue ready_next_level_queue_;   // WorkerType::NEXT_LEVEL tasks
ReadyQueue ready_sub_queue_;          // WorkerType::SUB tasks

Matching L2's per-shape ready queues (the shared MPMC ready_queues[] split by AIC / AIV / MIX), with the L3+ exception that we use std::queue (Allowed Exception 3: dynamic data structures on host) and only two worker types (Allowed Exception 2: NEXT_LEVEL + SUB at L3+, not AIC / AIV / MIX). Orchestrator::submit_* routes each slot to the queue matching slot.worker_type; Scheduler::on_task_complete routes a newly-ready consumer the same way, based on the consumer's worker type. Scheduler::dispatch_ready drains each queue with its own head-of-line break so a saturated pool of one type cannot stall dispatch for the other.

Completion queue

Endpoint completions whose worker returned or failed. Each WorkerCompletion carries {slot, group_index, outcome, error_message}; outcome is success, task failure, endpoint failure, or skipped. The Scheduler runs completion handling (fanout release, downstream wake/poison, try_consume) in its own thread so that WorkerThreads can immediately return to their next task.


3. Scheduler loop (pseudocode)

void Scheduler::run() {
    while (running_) {
        // Phase 0: wiring
        WiringEntry w;
        while (wiring_queue_.try_pop(w)) {
            wire_fanout(w);   // see §4
        }

        // Phase 1: dispatch (drains BOTH per-type queues; see §5)
        dispatch_ready();

        // Phase 2: completion
        WorkerCompletion c;
        while (completion_queue_.try_pop(c)) {
            on_task_complete(c);   // see §6
        }

        // If all three queues empty, block on a condition variable until
        // any producer signals work.
        wait_for_work();
    }
}

Phase order matters:

  • Wiring before dispatch: a task may become ready during wiring (all its producers already completed); wiring promotes it to ready_queue in the same Scheduler iteration.
  • Dispatch before completion: dispatch the backlog first to keep workers busy; completion handling is not time-critical (fanout release just queues more work for the next iteration).

4. Phase 0 — wiring

void Scheduler::wire_fanout(const WiringEntry &w) {
    TaskSlot csid = w.consumer;
    TaskSlotState &c = slots_[csid];
    int32_t actual_live = 0;

    for (TaskSlot psid : w.producers) {
        TaskSlotState &p = slots_[psid];
        std::lock_guard lk(p.fanout_mu);
        // COMPLETED producers are already done; FAILED producers poison this
        // consumer instead of making it ready.
        if (p.state.load() == TaskState::COMPLETED ||
            p.state.load() == TaskState::CONSUMED) continue;
        if (p.state.load() == TaskState::FAILED) {
            poison_task(csid, p.failure_message);
            continue;
        }
        p.fanout_consumers.push_back(csid);
        p.fanout_total++;
        actual_live++;
    }

    // Update consumer's fanin to the actual live count (producers already
    // finished don't count).
    c.fanin_count = actual_live;
    if (actual_live == 0) {
        // Strict-4: wiring promotes directly to the per-type queue.
        auto *q = (c.worker_type == WorkerType::NEXT_LEVEL) ? ready_next_level_queue_
                                                            : ready_sub_queue_;
        q->push(csid);
    }
}

Race with completion: a producer may finish between submit and wiring. The lock_guard(p.fanout_mu) + p.state.load() check ensures we either:

  • wire an edge and the producer's future completion will fire fanin_released++ for this consumer, or
  • see "already completed" and skip, correctly counting this producer as not contributing to fanin.

5. Phase 1 — dispatch

dispatch_ready drains each per-type ready queue with its own head-of-line break so one saturated pool cannot stall the other:

void Scheduler::dispatch_ready() {
    auto drain_one = [&](ReadyQueue *q) {
        TaskSlot slot;
        while (q->try_pop(slot)) {
            TaskSlotState &s = slots_[slot];
            int N = s.group_size();  // 1 for single-task slots

            std::vector<WorkerThread *> workers;
            for (int i = 0; i < N; i++) {
                WorkerThread *wt = nullptr;
                int32_t affinity = s.get_affinity(i);
                if (affinity >= 0) {
                    wt = (s.worker_type == WorkerType::NEXT_LEVEL)
                        ? manager_->get_worker_by_id(s.worker_type, affinity)
                        : manager_->get_worker_by_index(s.worker_type, affinity);
                    if (wt && (!wt->idle() || !s.worker_allowed(i, wt->worker_id())))
                        wt = nullptr;
                } else {
                    wt = manager_->pick_idle(
                        s.worker_type, workers, s.eligible_workers_for(i));
                }
                if (!wt) break;
                workers.push_back(wt);
            }
            if (static_cast<int>(workers.size()) < N) {
                q->push(slot);   // put back; try again after a completion
                break;
            }
            s.state.store(TaskState::RUNNING);
            for (int i = 0; i < N; i++) {
                workers[i]->dispatch({slot, i});
            }
        }
    };
    drain_one(ready_next_level_queue_);
    drain_one(ready_sub_queue_);
}

Dispatch hands off a WorkerDispatch {slot, group_index} to a WorkerThread. The WorkerThread reads ring.slot_state(slot).{callable, task_args, config} on its own thread and encodes it into the per-WT mailbox — see worker-manager.md §3 for the dispatch protocol.

Pick-idle back-pressure: when the manager cannot provide enough idle workers that also satisfy affinity and worker eligibility, the slot is pushed back onto its queue and that queue's drain halts; the other-type queue's drain continues. The ring's back-pressure at the Orch side already caps the total number of in-flight tasks across both types.

Worker eligibility is opaque scheduling metadata. The Scheduler compares worker ids and capability bits exposed through WorkerEndpoint::caps(), but does not inspect HCOMM, RDMA, socket, or remote buffer internals. For NEXT_LEVEL affinity, s.get_affinity(i) is a stable worker id and can be different from the next_level_threads_ vector index. SUB affinity is not public and keeps the internal index semantics.


6. Phase 2 — completion

Called by WorkerThread::on_complete_(completion) which pushes to completion_queue_. The Scheduler then:

void Scheduler::on_task_complete(const WorkerCompletion &completion) {
    TaskSlot sid = completion.task_slot;
    TaskSlotState &s = slots_[sid];

    // Group tasks aggregate per-member outcomes before the slot is terminal.
    if (s.group_size > 0) {
        if (!record_group_member_completion(completion)) return;
    }

    bool failed = completion.outcome != EndpointOutcome::SUCCESS;
    s.state.store(failed ? TaskState::FAILED : TaskState::COMPLETED);

    // Release fanout refs on downstream consumers
    std::vector<TaskSlot> consumers;
    {
        std::lock_guard lk(s.fanout_mu);
        consumers = s.fanout_consumers;    // snapshot (mutex protects vector)
    }
    for (TaskSlot csid : consumers) {
        if (failed) {
            poison_task(csid, completion.error_message);
            continue;
        }
        TaskSlotState &c = slots_[csid];
        if (++c.fanin_released == c.fanin_count) {
            // Strict-4: push to the queue matching the *consumer's*
            // worker type. A consumer of a NEXT_LEVEL producer can itself
            // be SUB, so we pick based on `c.worker_type`, not `s`.
            auto *q = (c.worker_type == WorkerType::NEXT_LEVEL) ? ready_next_level_queue_
                                                                : ready_sub_queue_;
            q->push(csid);
        }
    }

    // Also: this task itself may now be CONSUMED
    try_consume(sid);
}

try_consume

void Scheduler::try_consume(TaskSlot sid) {
    TaskSlotState &s = slots_[sid];
    if (s.state.load() != TaskState::COMPLETED &&
        s.state.load() != TaskState::FAILED) return;
    if (s.fanout_released.load() != s.fanout_total) return;

    s.state.store(TaskState::CONSUMED);

    // Erase tensormap entries this task produced
    for (int i = 0; i < s.task_args.tensor_count(); i++) {
        // only erase entries still pointing at this slot
        uint64_t ptr = s.task_args.tensor(i).data;
        if (orchestrator_->tensormap_lookup(ptr) == sid)
            orchestrator_->tensormap_erase(ptr);
    }

    // Return slot to ring pool
    ring_->release(sid);
    s.state.store(TaskState::FREE);
}

Scope release (when scope_end runs) calls back into the Scheduler to bump fanout_released by 1 on each scope-registered slot, triggering try_consume. This is how leaf tasks get reclaimed.


7. Start / Stop

void Scheduler::start(Config cfg) {
    manager_ = cfg.manager;
    orchestrator_ = cfg.orchestrator;
    running_.store(true);
    thread_ = std::thread([this] { run(); });
}

void Scheduler::stop() {
    running_.store(false);
    wake();
    thread_.join();
}

Worker::init calls start after all children are registered and WorkerManager::start has spawned the WorkerThread pool.


8. Completion channel from WorkerThread

// In WorkerThread, after endpoint->run() returns:
void WorkerThread::loop() {
    for (;;) {
        TaskSlot sid = queue_.pop();
        WorkerCompletion c = endpoint_->run(ring_, {sid, group_index});
        scheduler_->completion_queue_.push(c);   // notify Scheduler
    }
}

The completion path is one-way and asynchronous: the WorkerThread returns to its own queue immediately, and the Scheduler handles completion in its own loop. This keeps worker dispatch latency bounded by dispatch cost alone, not by completion-handling cost.


9. Invariants

  1. Scheduler is single-threaded: all three phase handlers run in the Scheduler's own thread. Atomics/mutexes on slot state are only needed for Orch/WorkerThread ↔ Scheduler coordination.
  2. Slot transitions are monotonic: success follows FREE → PENDING → READY → RUNNING → COMPLETED → CONSUMED; failure follows FREE → PENDING/READY/RUNNING → FAILED → CONSUMED.
  3. Dispatch consumes one ready entry: every ready_queue.push is matched by exactly one pick_idle + dispatch. Group tasks push once, dispatch N times via pick_n_idle.
  4. Completion is per-worker for groups: worker_done is called group_size times; only the terminal aggregate pushes one slot completion. If any member fails, not-yet-dispatched members become skipped and already running members are allowed to finish.
  5. Failed producers poison consumers: consumers of a failed producer move to FAILED, are never dispatched, and still run normal cleanup.
  6. try_consume is idempotent on CONSUMED: a repeated call after CONSUMED is a no-op.

10. Related