diff --git a/Cargo.lock b/Cargo.lock index 9d721153..264978e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3957,6 +3957,7 @@ dependencies = [ "pegaflow-core", "tokio", "tokio-stream", + "xxhash-rust", ] [[package]] diff --git a/docs/index.md b/docs/index.md index fb19ccc8..d1f7af48 100644 --- a/docs/index.md +++ b/docs/index.md @@ -65,6 +65,7 @@ Organized by domain (model line / subsystem / playbook / lesson) instead of by l | `models/glm52/ep8-deepep-moe.md` | PR4: GLM-baked DeepEP v2 shim instantiation replaces PR3's local scatter/combine; loader places experts into their packed layout at H2D time (post-load repack cannot fit HBM); rank 0 runs the full 78-layer spine + bs=1 greedy coordinator, ranks 1..7 replay the 75 MoE collectives per step. Gates: EP8 layer-6 oracle 62/64 (same outliers as EP1), full-model e2e generation. | | `models/glm52/ep1-forward.md` | PR3 built + all gates green on jz-38 H200 (2026-07-03): MoE/dense/bookend bricks (cherry-picked from the PP8 branch, re-gated via the #499 harness) + decoder-layer composition with cross-layer top-k sharing. MoE chain shaped to the DeepEP v2 elastic shim contract, Grouped + GEMV expert paths behind one signature; graph capturability as the bar. Gates: bookend exact, layer-0 dense 64/64, layer-6 MoE 62/64 both paths (measured router near-ties, bounded allowance). | | `models/glm52/bs1-decode-serial-overhead.md` | PR5a perf pass on the PR4 bring-up path: 101–103 → 46–50 ms/step (~2.2×) at bs=1, output byte-identical, all gates green. Fixes: quant/SiLU/GEMM rows bounded by the coordinator token count (device trap on violation), persistent MoE workspace (was ~11.6k allocs/step), FlashMLA sched metadata hoisted to build. Remaining gap = launch overhead → PR5c graph target. | +| `models/glm52/pd-vllm-prefill.md` | Decision record: GLM prefill rides vLLM (TP8+EP8) — openinfer's glm52 kernel surface is decode-only and attention has no parallelism, so self-built prefill ≈ another PR1–PR4 campaign. P→D readiness over pegaflow CPU P2P = D-side bounded fast-poll (extend M2's RemoteFetch, no vLLM changes); front-loaded risk is cross-engine hash keys (vLLM xxh3_128/CBOR/PYTHONHASHSEED vs kvbm xxh3_64/seed-1337) — openinfer-D grows a vllm-hash-compat key provider (pin `xxhash_cbor` + a shared PYTHONHASHSEED), plus a P/D-gated tail-block save extension in the connector (vLLM only hashes full pages; D is forbidden to prefill) with the router forwarding t1 into D's context. Roadmap: qwen3 vLLM-P + openinfer-D smoke test first (byte-compat oracle gate up front), GLM lands as milestone 2 after PR5. | ## models / deepseek-v4 diff --git a/docs/models/glm52/pd-vllm-prefill.md b/docs/models/glm52/pd-vllm-prefill.md new file mode 100644 index 00000000..c94872ae --- /dev/null +++ b/docs/models/glm52/pd-vllm-prefill.md @@ -0,0 +1,132 @@ +# GLM5.2 P/D:vLLM prefill + openinfer decode(决策记录) + +> **TL;DR:** **里程碑 1(qwen3 smoke)已跑通**:node 34 上 vLLM-P + openinfer-D,3 档 prompt greedy 输出与直连 baseline **逐字节一致**,多轮 delta 复用实证(对齐 prompt 只拉 1 块增量);TTFT 交接开销 470tok **+14ms** / 1.8k **+51ms** / 7k +147ms(其中 RDMA 992MiB 42ms @23.7GiB/s,达标线 ≤2k)。跑通的兼容等式见 §3.1——核心三条:P `--block-size` 必须等于 D 的 GPU page size(hash 粒度、seal 粒度、1:1 load 映射全部锚在它上);两侧 per-layer 注册 slot 数相等(vLLM fork 对 qwen3 默认 v2 runner = per-layer 36 slot,**cross-layer 是歧路**);slot 内 K/V 段布局差异由 pegaflow #382 修复吸收(Contiguous 设备布局加载 split-KV peer 的两段式块)。原有决策不变:GLM prefill 走 vLLM(TP8+EP8)、P→D 就绪 = D 端有界快轮询、D 侧 vllm-hash-compat provider(钉 `xxhash_cbor` + 统一 `PYTHONHASHSEED`)。**未完成**:尾块 connector 扩展 + router 变体 A(当前 D 兜底计算 ≤block_size 的尾巴)、严格 no-prefill 429/500 开关(当前 miss 等 5s 后 scratch 兜底)。GLM 接入为里程碑 2(依赖 PR5)。 +> +> **Last touched:** 2026-07 + +相关:`dp1-ep8-decode-plan.md`(decode 侧 PR5 是 D 节点上线的关键路径)· `../qwen3/pd-disaggregation-m2.md`(M2 全流程,本文大量复用其结论)· pegaflow `docs/pd.md` / `docs/pd-rdma-push.md`。 + +## 1. 决策一:GLM prefill 走 vLLM + +### openinfer 自建 prefill 的账单(审计结论,2026-07) + +decode-only 假设焊死在每一层,证据: + +| 组件 | decode-only 证据 | +| --- | --- | +| FlashMLA sparse attention | AOT 实例化写死 `kSq = 1`(`csrc/glm52/glm52_flashmla_sparse.cu:16`),只编了 `sm90/decode/sparse_fp8/splitkv_mla.cuh` | +| DeepGEMM indexer logits | 只包了 paged 变体,`next_n ∈ {1,2}`(`ops/glm52/deepgemm_mqa.rs:35`) | +| DeepEP MoE all-to-all | shim 只实例化 elastic **decode** dispatch/combine,128 tokens/rank 上限,延迟优化非吞吐 | +| Executor | bs=1 串行 coordinator,"prefill rides decode token-by-token"(`runner.rs:416`),bring-up 专用 | +| **attention 并行** | **不存在**。DP1:attention/dense/bookend 全在 rank 0,ranks 1–7 只当 MoE expert 工人。prefill 是算力瓶颈,单卡 attention = 扔掉 7/8 FLOPs | + +自建需要:attention 分布(TP8-MLA 或 DP-attention,全新)+ FlashMLA sparse prefill 新 AOT/wrapper + DeepGEMM `fp8_mqa_logits`(prefill 变体已 vendored,缺 wrapper)+ DeepEP normal 模式 torch-free shim + chunked-prefill executor/scheduler(PR5 本身未落地)。数周量级,且全程占 8×H200 做 gate。 + +### vLLM 侧为什么顺 + +- GLM5.2 在 vLLM 是 `GlmMoeDsaForCausalLM(DeepseekV2ForCausalLM)` 一行继承,TP8+EP8 prefill 开箱即用;vLLM 本来就是本 campaign 的 production reference。 +- KV 格式两边是**同一个 kernel contract 的两个实现**:MLA cache 656 B/token、page 64(512 fp8 nope + 4×f32 scale + 64×bf16 rope)= vLLM 的 `fp8_ds_mla`;indexer K cache 两边都是 DeepGEMM block-split paged 布局。需字节级验证,但不是格式转换。 +- pegaflow→openinfer 的 KV 通路 M2 已打通(openinfer #522 / pegaflow #381);GLM 扩展为双 cache:78 层 MLA + 21 层 indexer K cache(57 个 shared 层无 indexer)。 + +**将来若收回 prefill**:更自然的形态是 DP-attention + EP8(MLA 是 MQA、latent cache 每 rank 全量,与现有 DeepEP shim 直接组合),而不是 TP8。由 decode 侧稳定后的真实流量测量触发。 + +## 2. 决策二:P→D 就绪信号 = D 端有界快轮询 + +### 问题 + +pegaflow CPU P2P 链路上,P 结束请求后有一段 offload 尾巴:D2H save(后台线程)→ 写管线 seal → metaserver 注册(**fire-and-forget**,事件驱动攒批,`pegaflow-core/src/metaserver_client.rs:188-222`)。router 是同步流程:等 P 的 HTTP 首 token 响应就转发 D,**不等注册**(`pegaflow-router.rs:206`)。所以 D 可能在注册落地前 query → zero-hit。 + +M2 里这个竞态不存在:openinfer-P 的 `flush_on_finish` 把 `Finished` 事件扣到注册对 peer 可见后才释放,P 的响应即 ready 信号。vLLM 做 P 后此保证消失——vLLM 的 `wait_for_save` 只入队即返(scheduler 不阻塞,`connector/worker.py:646-660`),`get_finished` 报 sent 也不等注册。**vLLM connector 从未调用 pegaflow-core 里现成的 `flush_saves_and_registrations` 屏障**(`lib.rs:738-748`,为 M2 而造)。 + +而 openinfer-D 现状:`RemoteFetch` 相位只等 pegaflow 的 `Loading`(拉取在途);metaserver 无记录时 query 立即空手 → `Scratch` → 本地 prefill(`executor/remote_fetch.rs:78-80`)。 + +### 三个候选的取舍 + +| 方案 | 结论 | 理由 | +| --- | --- | --- | +| vLLM/router 异步 `/kv_ready` callback(pd.md 规划过、未实现) | ❌ | router 变有状态(pending 表、callback 丢失超时、P 崩溃清理),改 vLLM connector + router 两处,串行链路延迟与轮询相当,收益只是省几次 query | +| **D 端有界快轮询** | ✅ | 窗口由 P offload 尾巴决定,**有界且短(几十 ms)**,"太细 metaserver 撑不住 / 太粗 TTFT 难看"的两难不成立:5ms 间隔 × 窗口 ≈ 每请求 2–10 次 query,QPS 随请求速率线性而非并发数;改动全在我们自己的代码里 | +| RDMA doorbell(内存代表请求状态) | 推迟到 M3 | 正确形态 pegaflow 已造好——`pd-rdma-push` v2 + WRITE_WITH_IMM(P4 阶段,vLLM↔vLLM 跨机 TP8 已通)。当长 prompt 让 CPU 往返物理成本顶破预算时,答案是整体切 M3,不是更聪明的轮询 | + +### 改动清单 + +1. **openinfer D**(核心,很小):`remote_fetch.rs` 决策函数加分支——请求带 "expect remote KV" 标记时,zero/partial-hit 不 `Scratch`,进入带独立短 deadline(~500ms)的重查询等待;轮询由现有 per-tick 机制驱动,加 ~5ms 最小间隔节流。该函数是 #532 刚抽出的纯决策逻辑,单测顺手。 +2. **"expect remote" 标记**:router 转发 D 时注入字段,或 D 全局配置 "P/D 模式冷请求默认等待"。 +3. **GLM 定制等待谓词**:D 必须等**完整前缀**命中才放行——partial-hit 的 suffix 重算在 qwen3 是优雅退化,在 GLM 是 token-by-token 骑 decode 内核,不可接受。同理 deadline 超时的 `Scratch` 回退对 GLM 改为 fail/requeue(router 重派或 503),做成策略开关。 +4. **(可选)pegaflow connector 一小刀**:`_process_save_batch` 处理完 finished 请求后调现成的 `flush_saves_and_registrations`,压掉注册尾巴。先测窗口分布再决定,轮询本来就能吸收。 + +### TTFT 开销预算(<50ms 目标,GLM 8k prompt ≈ 430MB KV) + +物理成本:P 端尾部 D2H(chunked prefill 期间流式,尾巴≈最后一 chunk)+ D 轮询期望 ≤5ms + RDMA READ ~20ms(M2 实测 21.7 GiB/s)+ H2D ~18ms。合计 ~40–50ms,预算内但紧;**这是 CPU 中转的结构性成本**——若长 prompt 顶破预算,触发 M3(GPU→GPU push),不在轮询上做文章。 + +## 3. 前置风险项:跨引擎 hash key(最早能证伪,先做) + +M2 的"无 handle"协议靠 P/D 确定性推导相同 key。两边算法**同为 xxhash 族但完全不同**: + +| | vLLM | openinfer / kvbm | +| --- | --- | --- | +| 算法 | `xxh3_128`(或 `sha256_cbor`,`--prefix-caching-hash-algo` 可配) | `xxh3_64`(dynamo `compute_hash_v2`) | +| 输入 | CBOR 编码的 `(parent_hash, token_ids_tuple, extra_keys)`(`vllm/v1/core/kv_cache_utils.py:563`) | token 块字节直接链式 | +| 链根 / seed | `NONE_HASH`:`PYTHONHASHSEED` 未设时 = `os.urandom(32)`(**跨实例不可复现**);设了 = `hash_fn(seed)` | 固定 base seed **1337**(`ROUTER_XXH3_SEED`),LoRA 加盐 | + +**决策:openinfer-D 侧复刻 vLLM 的 hash 推导,pegaflow connector 不改。** 备选的"connector 重算引擎中性 key"被否——不想动 connector,且 dynamo 上游有对齐 vLLM hash 的趋势,届时 openinfer 换成上游实现即可白嫖。`PYTHONHASHSEED` 钉死可接受。 + +复刻的精确语义(vLLM `utils/hashing.py` + `v1/core/kv_cache_utils.py:563`): + +- **必须钉 `--prefix-caching-hash-algo xxhash_cbor`**。裸 `xxhash`/`sha256` 变体走 **pickle** 序列化,Rust 无法复刻;`_cbor` 变体是 `cbor2.dumps(input, canonical=True)`(RFC canonical CBOR),可逐字节复刻。xxh3_128 → 16 字节 key,恰好等宽于 openinfer 现有 content key(`pool.rs:602` 的 `[u8;16]`)。 +- 每块 key = `xxh3_128(cbor((parent_hash: bytes, token_ids: tuple[int], extra_keys)))`,text-only 下 `extra_keys = None`。 +- 链根 `NONE_HASH = xxh3_128(cbor(PYTHONHASHSEED 字符串))`;**所有 P 节点与 D 的推导配置必须钉同一个 seed**(未设时 vLLM 用 `os.urandom(32)`,跨实例不可复现——部署上 fail-fast 校验)。 +- Rust 侧落点:一个 vllm-hash-compat key provider(canonical CBOR 编码 `(bstr, [uint...], null)` + `xxhash-rust` 的 xxh3_128),与 kvbm `SequenceHash` provider 并列,P/D 模式下选用。纯函数,golden vector 单测(从真实 vLLM 进程抓取)。 +- **漂移守卫**:vLLM 代码注释明示默认算法在迁移中;P 节点 vLLM 版本升级时必须重跑兼容 gate。extra_keys 语义(cache_salt/mm/LoRA)超出 text-only 范围时 gate 会红。 + +同时对齐 namespace(pegaflow `derive_namespace(model, TP)`,openinfer 查询时用同一字符串——M2 已有 4B/8B 同几何静默交叉命中的前车之鉴)与 hash 块粒度(GLM page 64 = vLLM `hash_block_size`)。 + +### 3.1 兼容等式(里程碑 1 实测拍平,qwen3 已验证) + +P 存的块能被 D 加载,当且仅当下面全部成立。前三条决定 query 是否命中,第四条是 pegaflow 引擎唯一强制的 load 守卫,第五条引擎不校验、错了**静默数据错位**(pegaflow #382 后 Contiguous←split 这一种组合被吸收,其余组合仍需人工对齐): + +1. **namespace 相等**。`derive_namespace` = sha256(model/dtype/tp/pp/num_kv_heads/head_size/num_hidden_layers/cache_dtype/dcp/pcp/cross_layer_blocks/mla_layer_split)[:8]——**`block_size` 不在 factor 里**;qwen3 栈两侧 = `cd6ed6c5`。openinfer D 侧用显式 override 传入,不自己推导。 +2. **块粒度相等 + hash 算法相等**。openinfer 的 hash/seal/load 三条链全部锚在 GPU page size 上(compat hasher 直接读 `budget.block_size`,remote 块→GPU page 是 1:1 映射,无重切分),pegaflow 层面也没有跨粒度重切分。**结论:P 的 `--block-size` 迁就 D 的 page size(qwen3 = 16),openinfer 零改动。** 里程碑 1 首轮 5s 全 miss 的根因就是 P=64 vs D=16——hash 从第 0 块就对不上;query miss 只可能来自 namespace/hash/owner-TTL,拓扑不匹配是 load 期报错,两个失败面不要混。 +3. **`total_slots` 相等**(layer-first = `num_layers × tp_size`)。vLLM fork 对 qwen3 默认 v2 model runner,per-layer 注册 36 slot,与 D 天然相等。**cross-layer(1 slot)是歧路**:它只存在于 v1 runner(`VLLM_USE_V2_MODEL_RUNNER=0`),且与 D 的 per-layer 注册必然撞 slot-count 守卫;污染过的 pool 两种布局共存后连 P 自己都加载不了("namespace is shared by incompatible KV layouts")——切布局配置必须清池重启。 +4. **slot 内 K/V 段布局**。vLLM FA NHD `(2, blocks, ...)` → connector 推断 K/V split 两段;openinfer page-first fused → 单段 `[K|V]` 连续。pegaflow #382 让 Contiguous 设备布局按段拆两笔拷贝吸收这个差异(并加了段长必须恰好铺满设备块的守卫)。GLM/MLA 是单段 latent,天然无此问题。 + +**第一个 gate(不依赖任何 scheduler/轮询改动)**:vLLM-P 存一段已知 prompt 的 KV → openinfer-D 用 compat provider 推导的 key 查询 → 命中且**逐字节比对** MLA cache(656B layout)与 indexer K cache(DeepGEMM block-split layout)。这个 gate 同时证伪 hash 复刻和字节布局两个风险。prompt 至少两档:**对齐(`% 64 == 0`)与非对齐**,让尾块的 key 推导和字节布局从第一天就在 gate 覆盖内,不留到集成期。 + +### 尾块问题:vLLM 只 hash 满页,而 D 禁止任何 prefill + +vLLM 只对满页(64 token)生成 block hash;`prompt_len % 64` 的尾巴是 partial block——有 block_id、无 hash,connector save_intent 里不存在。P 存进 pegaflow 的最长前缀 = `floor(prompt_len/64)×64`,D 命中后还差 0–63 token 的 KV。GLM 的 D 节点补这个尾巴 = token-by-token 骑 decode 内核(今天 ~50ms/步,worst case ≈3s),且"decode 节点做 prefill"污染调度——**策略红线:D 零 prompt 位置计算**。 + +**决策:P 端 connector save 路径加一个加法式尾块扩展(P/D 模式 gate 住)。** 请求结束时对 partial block 用同一个 hash 函数派生 key——`xxh3_128(cbor((last_full_hash, tail_token_ids, None)))`,vLLM 自己不算这个但函数良定义,D 侧 compat provider 同样可推导——并把该页一并 D2H。注意与被否的"中性 key"区分:key 体系仍是 vLLM 原生 hash + D 侧复刻,此处只是 save 覆盖面的小扩展;不追 vLLM 内部实现、不会被 dynamo 上游对齐作废(上游对齐的是满块 hash,尾块本来就是 vLLM 不覆盖的空白)。`prompt_len % 64 == 0` 时无尾块,路径退化。 + +**首 token 归属**(尾块之后 prefill 的最后一丝影子): + +- **变体 A(选定,D 零 prompt 计算)**:router 把 P 响应的 t1 直接发客户端(TTFT = P 响应时刻),并把 t1 追加进转发给 D 的上下文;D 首步 = 对 t1 的真 decode(prompt KV 全部就位,t1 的 KV 由该步写入)。采样一致性天然成立。 +- 变体 B(vLLM 式备选):D 重算最后一个 prompt token 的单步 forward 出 t1(vLLM full-hit 的 `num_computed_tokens -= 1` 语义)。greedy 下与 P 一致,temp>0 时 P 的 t1 作废、P 的一步 decode 纯浪费——最终指向 P 侧 prefill-only 模式(对应 openinfer #526 同类项)。 + +## 4. 补充拍板(2026-07 讨论) + +- **P 端 CPU pool 容量不是问题**:P 节点是 TB 级 CPU memory,evict-before-fetch 按不发生设计;不做保护期机制,出了再说。 +- **D 禁 prefill 的失败语义**:miss/timeout 一律对上游回 429/500(可重试),router/客户端重试即重走 P(内容寻址天然幂等)。不做更花的重派机制。 +- **多轮 delta 拉取**:pegaflow 内容寻址前缀匹配自动覆盖(P 每轮重 prefill delta 并注册,D 前缀命中旧块、只拉新增),无需新机制——M2 已实证(turn2+ 只拉 64 块 ~15ms)。 +- **TP→DP 层映射**:MLA latent 天然 TP 无关(每 rank 全量副本),connector 已有 layer-split 注册,非问题。 +- **GLM PR5 与 P/D 的耦合方式**:推迟到 GLM 接入阶段再定(见 §5 路线)。 + +## 5. 路线:qwen3 smoke test 先行,GLM 后接 + +**里程碑 1:qwen3 的 vLLM-P + openinfer-D P/D smoke test —— 已跑通(2026-07,node 34)。** + +| 步骤 | 状态 | +| --- | --- | +| 1. hash + 字节布局兼容 gate | ✅ 以 e2e 形态验证(§3.1 兼容等式;3 prompt 逐字节一致本身就是布局 gate) | +| 2. vllm-hash-compat key provider | ✅ `openinfer-kv-offload/src/vllm_hash.rs`,golden vector 单测 ×5 | +| 3. 尾块 connector 扩展 + router 变体 A | ⬜ 未做——当前 D 兜底计算尾巴(≤block_size=16 token,qwen3 上可接受;GLM 前必须做) | +| 4. `RemoteFetch` 等待分支 | ✅ miss 窗口内重查询(busy path 也有 5ms/请求节流)+ 满窗降级 warn + 熔断(连续 3 请求满窗 miss 后新请求跳过等待,任一远端命中重新武装)+ 启动指纹日志(seed/namespace/block_size/NONE_HASH/几何,与 P 侧肉眼比对);严格 no-prefill 429/500 开关未做(当前 miss 满窗后 scratch 兜底,输出正确、日志可见) | +| 5. e2e smoke + delta 复验 | ✅ 3 prompt BYTE-IDENTICAL;对齐 prompt 只拉 1 块增量(内容寻址 delta 复用) | + +**TTFT A/B**(unique-prefix 冷跑,3 样本取中位,P 直连 vs router P/D):470tok 34→48ms(**+14ms**);1.8k tok 55→106ms(**+51ms**);7k tok 204→351ms(+147ms,其中 RDMA 992MiB 42ms @23.7GiB/s,其余为 P save D2H + 发现轮询 + H2D 回灌——都可流水化,follow-up)。≤2k 在 50ms 预算内。 + +环境与复跑:node 34 `/data/pd-stack/`(`stack.sh` 组件级启停 = pidfile+setsid 进程组整组杀+清 GPU 显存等待;`smoke.sh` 严格校验 HTTP 200+JSON;`ttft.py` A/B)。openinfer 分支 `feat/pd-vllm-hash-compat`;pegaflow 依赖 [#382](https://github.com/novitalabs/pegaflow/pull/382)(Contiguous←split 加载修复)。**测 P/D 命中前必须重启 D 清 prefix cache,否则量到的是 D 本地缓存**(baseline 请求会把 prompt 灌进 D)。 + +**里程碑 2:GLM5.2 接入**(另起,依赖 PR5)——GLM 的 paged-KV/scheduler/kvbm 基础设施 + 双 cache(656B MLA + indexer)的注册与字节 gate + GLM 等待谓词(完整前缀 + no-prefill 硬约束)。PR5 scheduler 是否按 P/D-ready 设计届时定。 + +decode 侧 PR5(scheduler + CUDA graph)照旧并行推进——它是里程碑 2 的前置。 diff --git a/openinfer-kv-cache/src/pool.rs b/openinfer-kv-cache/src/pool.rs index 96822f80..6a0e490c 100644 --- a/openinfer-kv-cache/src/pool.rs +++ b/openinfer-kv-cache/src/pool.rs @@ -253,6 +253,13 @@ impl PrefixProbe { .map(|h| sequence_hash_bytes(h).to_vec()) .collect() } + + /// Number of blocks [`Self::cpu_query_hashes`] covers, without + /// materializing the hash bytes. Callers substituting their own key + /// scheme (vLLM-compat P/D) slice their chain to exactly this window. + pub fn cpu_query_window(&self) -> usize { + self.cacheable.saturating_sub(self.gpu_hit) + } } /// An opaque strong pin on one registered KV block. While held it keeps the diff --git a/openinfer-kv-offload/Cargo.toml b/openinfer-kv-offload/Cargo.toml index 8046bcab..2d82e25b 100644 --- a/openinfer-kv-offload/Cargo.toml +++ b/openinfer-kv-offload/Cargo.toml @@ -22,6 +22,7 @@ half = { workspace = true } log = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true, features = ["net"] } +xxhash-rust = { workspace = true } [lints] workspace = true diff --git a/openinfer-kv-offload/src/lib.rs b/openinfer-kv-offload/src/lib.rs index 9b769c40..2d353bba 100644 --- a/openinfer-kv-offload/src/lib.rs +++ b/openinfer-kv-offload/src/lib.rs @@ -14,8 +14,10 @@ //! polls its [`LoadHandle`] each scheduler tick. mod engine; +mod vllm_hash; pub use engine::{LoadHandle, OffloadConfig, OffloadEngine, P2pConfig, QueryHit, QueryOutcome}; +pub use vllm_hash::{VLLM_HASH_BYTES, VllmBlockHasher}; // Re-exported so callers name pegaflow's engine types through this bridge. pub use pegaflow_core::{EngineError, PegaEngine, QueryLeaseId}; diff --git a/openinfer-kv-offload/src/vllm_hash.rs b/openinfer-kv-offload/src/vllm_hash.rs new file mode 100644 index 00000000..86ecc66e --- /dev/null +++ b/openinfer-kv-offload/src/vllm_hash.rs @@ -0,0 +1,212 @@ +//! vLLM-compatible block-hash derivation for cross-engine P/D KV lookup. +//! +//! When vLLM is the prefill node, its pegaflow connector registers KV blocks +//! under vLLM's own prefix-cache hashes (`vllm/v1/core/kv_cache_utils.py:: +//! hash_block_tokens`). For an openinfer decode node to find those blocks by +//! content, it must derive byte-identical keys from the same token sequence. +//! This module mirrors exactly one vLLM configuration — the only one that is +//! replicable outside Python: +//! +//! - `--prefix-caching-hash-algo xxhash_cbor`: key = `xxh3_128(cbor(input))` +//! with canonical CBOR (RFC 8949). The non-`_cbor` variants serialize via +//! pickle and cannot be reproduced here. +//! - `PYTHONHASHSEED` set on every vLLM process: the chain root `NONE_HASH` is +//! `xxh3_128(cbor(seed_string))`. Unset, vLLM falls back to `os.urandom` and +//! keys are unreproducible across processes — deployment must fail fast. +//! - Text-only requests: vLLM's `extra_keys` (multimodal, LoRA, cache salt) +//! are `None`. Anything else diverges and must be caught by the compat gate. +//! +//! Per block the hashed input is the 3-tuple `(parent_hash: bytes, +//! token_ids: tuple[int, ...], None)`, encoded as a CBOR array. vLLM only +//! hashes full blocks; the partial tail block has no vLLM hash. The P/D tail +//! extension applies the same function to the partial token list, which both +//! sides can derive (`docs/models/glm52/pd-vllm-prefill.md` §3). + +use xxhash_rust::xxh3::xxh3_128; + +/// Key width: xxh3_128 digest. +pub const VLLM_HASH_BYTES: usize = 16; + +/// Derives vLLM-compatible block-hash chains for one `(seed, block_size)` +/// configuration. +pub struct VllmBlockHasher { + none_hash: [u8; VLLM_HASH_BYTES], + block_size: usize, +} + +impl VllmBlockHasher { + /// `python_hash_seed` must equal the `PYTHONHASHSEED` value set on every + /// vLLM prefill process; `block_size` must equal vLLM's hash block size. + pub fn new(python_hash_seed: &str, block_size: usize) -> Self { + assert!(block_size > 0, "block_size must be positive"); + let mut seed_cbor = Vec::with_capacity(python_hash_seed.len() + 9); + write_head(&mut seed_cbor, MAJOR_TSTR, python_hash_seed.len() as u64); + seed_cbor.extend_from_slice(python_hash_seed.as_bytes()); + Self { + none_hash: xxh3_128(&seed_cbor).to_be_bytes(), + block_size, + } + } + + /// Hash one block: `parent` is the previous block's hash (`None` for the + /// first block, which chains off `NONE_HASH`). + pub fn hash_block( + &self, + parent: Option<&[u8; VLLM_HASH_BYTES]>, + token_ids: &[u32], + ) -> [u8; VLLM_HASH_BYTES] { + let parent = parent.unwrap_or(&self.none_hash); + // array(3): [ bstr(parent), array(n)(uint...), null ] + let mut buf = Vec::with_capacity(2 + VLLM_HASH_BYTES + 2 + 9 + 5 * token_ids.len()); + write_head(&mut buf, MAJOR_ARRAY, 3); + write_head(&mut buf, MAJOR_BSTR, VLLM_HASH_BYTES as u64); + buf.extend_from_slice(parent); + write_head(&mut buf, MAJOR_ARRAY, token_ids.len() as u64); + for &t in token_ids { + write_head(&mut buf, MAJOR_UINT, u64::from(t)); + } + buf.push(CBOR_NULL); + xxh3_128(&buf).to_be_bytes() + } + + /// Key chain for a token sequence: one key per full block. vLLM never + /// hashes the partial tail block; when the P/D tail extension lands it + /// derives the extra key via [`Self::hash_block`] on the tail slice. + pub fn key_chain(&self, token_ids: &[u32]) -> Vec> { + let full = token_ids.len() / self.block_size; + let mut keys = Vec::with_capacity(full); + let mut parent: Option<[u8; VLLM_HASH_BYTES]> = None; + for chunk in token_ids.chunks_exact(self.block_size) { + let h = self.hash_block(parent.as_ref(), chunk); + keys.push(h.to_vec()); + parent = Some(h); + } + keys + } + + /// The chain root (`xxh3_128(cbor(seed_string))`), for startup-time + /// fingerprint logging: the P and D values must match byte-for-byte. + pub fn none_hash(&self) -> [u8; VLLM_HASH_BYTES] { + self.none_hash + } +} + +const MAJOR_UINT: u8 = 0; +const MAJOR_BSTR: u8 = 2; +const MAJOR_TSTR: u8 = 3; +const MAJOR_ARRAY: u8 = 4; +const CBOR_NULL: u8 = 0xf6; + +/// Canonical (minimal-length) CBOR head: major type + unsigned argument. +fn write_head(out: &mut Vec, major: u8, value: u64) { + let m = major << 5; + match value { + 0..=23 => out.push(m | value as u8), + 24..=0xff => { + out.push(m | 24); + out.push(value as u8); + } + 0x100..=0xffff => { + out.push(m | 25); + out.extend_from_slice(&(value as u16).to_be_bytes()); + } + 0x1_0000..=0xffff_ffff => { + out.push(m | 26); + out.extend_from_slice(&(value as u32).to_be_bytes()); + } + _ => { + out.push(m | 27); + out.extend_from_slice(&value.to_be_bytes()); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // Golden vectors captured from vLLM (pegaflow .venv, 2026-07-03): + // PYTHONHASHSEED= python -c "init_none_hash(xxhash_cbor); ..." + // See docs/models/glm52/pd-vllm-prefill.md §3. + + fn hex(bytes: &[u8]) -> String { + bytes.iter().map(|b| format!("{b:02x}")).collect() + } + + #[test] + fn none_hash_matches_vllm() { + assert_eq!( + hex(&VllmBlockHasher::new("0", 4).none_hash), + "1ebe36576dcb573f26b99533a20aaeca" + ); + assert_eq!( + hex(&VllmBlockHasher::new("42", 4).none_hash), + "a9d2e63407fce1a26c9dc9d7fa2d7caf" + ); + } + + #[test] + fn cbor_encoding_matches_cbor2_canonical() { + // cbor2.dumps((NONE_HASH, (1,2,3,4), None), canonical=True) + let h = VllmBlockHasher::new("0", 4); + let mut buf = Vec::new(); + write_head(&mut buf, MAJOR_ARRAY, 3); + write_head(&mut buf, MAJOR_BSTR, 16); + buf.extend_from_slice(&h.none_hash); + write_head(&mut buf, MAJOR_ARRAY, 4); + for t in 1u64..=4 { + write_head(&mut buf, MAJOR_UINT, t); + } + buf.push(CBOR_NULL); + assert_eq!( + hex(&buf), + "83501ebe36576dcb573f26b99533a20aaeca8401020304f6" + ); + // Multi-byte uint arguments (token id 100000 → 0x1a be32). + let mut buf = Vec::new(); + write_head(&mut buf, MAJOR_ARRAY, 3); + write_head(&mut buf, MAJOR_BSTR, 16); + buf.extend_from_slice(&h.none_hash); + write_head(&mut buf, MAJOR_ARRAY, 4); + for t in 100_000u64..100_004 { + write_head(&mut buf, MAJOR_UINT, t); + } + buf.push(CBOR_NULL); + assert_eq!( + hex(&buf), + "83501ebe36576dcb573f26b99533a20aaeca841a000186a01a000186a11a000186a21a000186a3f6" + ); + } + + #[test] + fn block_chain_matches_vllm() { + let h = VllmBlockHasher::new("0", 4); + let b1 = h.hash_block(None, &[1, 2, 3, 4]); + let b2 = h.hash_block(Some(&b1), &[5, 6, 7, 8]); + let tail = h.hash_block(Some(&b2), &[9, 10]); + assert_eq!(hex(&b1), "0a8577df5ee3430515a8cc1f6e3ac52e"); + assert_eq!(hex(&b2), "d152782cb4d753bde718a811a3b75e23"); + assert_eq!(hex(&tail), "7157acec76700e416a50d67e2334f6f6"); + } + + #[test] + fn realistic_block_size_matches_vllm() { + let h = VllmBlockHasher::new("0", 64); + let tokens: Vec = (100_000..100_064).collect(); + let big = h.hash_block(None, &tokens); + assert_eq!(hex(&big), "6e43e6e613b5ab91b1c6332dca7020f7"); + } + + #[test] + fn key_chain_covers_full_blocks_only() { + let h = VllmBlockHasher::new("0", 4); + let tokens: Vec = (1..=10).collect(); + let keys = h.key_chain(&tokens); + assert_eq!(keys.len(), 2, "partial tail block must not be keyed"); + assert_eq!(hex(&keys[0]), "0a8577df5ee3430515a8cc1f6e3ac52e"); + assert_eq!(hex(&keys[1]), "d152782cb4d753bde718a811a3b75e23"); + + let aligned: Vec = (1..=8).collect(); + assert_eq!(h.key_chain(&aligned), keys[..2]); + } +} diff --git a/openinfer-qwen3/src/executor.rs b/openinfer-qwen3/src/executor.rs index f441c622..e0d622c7 100644 --- a/openinfer-qwen3/src/executor.rs +++ b/openinfer-qwen3/src/executor.rs @@ -1,7 +1,7 @@ use std::collections::{HashMap, HashSet}; use std::thread; -use anyhow::Result; +use anyhow::{Result, ensure}; use crossbeam_channel as channel; use crate::batch_decode_buffers::{BATCH_BUCKETS, BatchDecodeBuffers}; @@ -915,6 +915,11 @@ pub struct Qwen3Executor { /// saves + MetaServer registrations are peer-visible, so the HTTP response /// doubles as the KV-ready signal (see `Qwen3P2pOptions::flush_on_finish`). flush_offload_on_finish: bool, + /// P/D decode role with a vLLM prefill peer: offload query keys derive + /// with vLLM's hash scheme, a zero hit waits out the producer's + /// registration tail, and self-saves are skipped (this node's kvbm keys + /// would be unfindable in the vLLM-keyed content domain). + vllm_compat: Option, /// Green Context SM partition for concurrent prefill/decode. `None` when /// disabled (default) or when the GPU does not support Green Contexts. overlap: Option, @@ -979,6 +984,17 @@ enum PrefetchPhase { RemoteFetch { query_hashes: Vec>, deadline: std::time::Instant, + /// vLLM-compat P/D handoff race guard: until this instant a zero hit + /// keeps the request parked (the producer's registration hasn't + /// landed yet) instead of degrading to prefill-from-scratch. Set to + /// the park time (i.e. already expired) outside vLLM-compat mode. + miss_deadline: std::time::Instant, + /// When the request was parked — for the degradation warning. + parked_at: std::time::Instant, + /// Last re-query instant: ticks inside [`REMOTE_REQUERY_INTERVAL`] + /// skip the RPC so N parked requests cannot turn every scheduler + /// tick into N serial MetaServer round-trips. + last_query: std::time::Instant, }, /// Host→GPU DMA into reserved local blocks is in flight. Loading { @@ -996,6 +1012,30 @@ enum PrefetchPhase { /// pegaflow's own fetch timeout into a plain local hit count well before this. const REMOTE_FETCH_DEADLINE: std::time::Duration = std::time::Duration::from_secs(15); +/// Minimum spacing between re-query RPCs for one parked request. The idle +/// scheduler loop already throttles at ~5ms; this bounds the busy path too, +/// where decode ticks can come faster than the RPC is worth repeating. +const REMOTE_REQUERY_INTERVAL: std::time::Duration = std::time::Duration::from_millis(5); + +/// vLLM-compat miss breaker: after this many consecutive requests each +/// exhausted the whole zero-hit wait window, new requests skip the wait (the +/// prefill peer is evidently not publishing — misconfig or down) instead of +/// taxing every cold request the full window. Any remote hit re-arms waiting. +const MISS_BREAKER_THRESHOLD: u32 = 3; + +/// vLLM-compat P/D mode, derived from [`crate::Qwen3VllmCompatOptions`] at +/// executor build time (the hasher needs the resolved KV block size). +struct VllmCompatState { + hasher: openinfer_kv_offload::VllmBlockHasher, + /// Zero-hit wait window: how long a cold request re-queries before + /// giving up on the expected remote KV (see `RemoteFetch::miss_deadline`). + miss_wait: std::time::Duration, + /// Requests in a row that exhausted the whole wait window with zero hits. + /// At [`MISS_BREAKER_THRESHOLD`] the breaker opens and new requests skip + /// the wait; any remote hit resets it. + consecutive_miss_windows: u32, +} + impl Qwen3Executor { pub(crate) fn single( model: Qwen3Model, @@ -1049,6 +1089,47 @@ impl Qwen3Executor { let offload = build_offload(offload_opts, &kv_mgr, model.config(), model.device_ctx())?; let total_blocks = kv_mgr.pool().total_blocks(); let padding_block_id = kv_mgr.pool().padding_block_id(); + let vllm_compat = match offload_opts.vllm_compat.as_ref() { + None => None, + Some(c) => { + ensure!( + c.miss_wait < REMOTE_FETCH_DEADLINE, + "kv-pd miss wait ({:?}) must stay below the {:?} remote-fetch \ + deadline, which would otherwise silently cap it", + c.miss_wait, + REMOTE_FETCH_DEADLINE, + ); + let hasher = openinfer_kv_offload::VllmBlockHasher::new( + &c.python_hash_seed, + budget.block_size, + ); + // Cross-engine fingerprint. Every P/D mismatch (seed, + // namespace, block size, geometry) otherwise presents as + // nothing but slow cold requests — this line is what an + // operator diffs against the vLLM peer's startup config. + log::info!( + "vLLM-compat P/D active: seed={} namespace={} block_size={} \ + none_hash={} layers={} kv_heads={} head_dim={} miss_wait={:?}", + c.python_hash_seed, + c.namespace, + budget.block_size, + hasher + .none_hash() + .iter() + .map(|b| format!("{b:02x}")) + .collect::(), + budget.num_layers, + budget.num_kv_heads, + budget.head_dim, + c.miss_wait, + ); + Some(VllmCompatState { + hasher, + miss_wait: c.miss_wait, + consecutive_miss_windows: 0, + }) + } + }; Ok(Self { metadata, kv_mgr, @@ -1077,6 +1158,7 @@ impl Qwen3Executor { .p2p .as_ref() .is_some_and(|p2p| p2p.flush_on_finish), + vllm_compat, overlap: None, async_prefill: None, speculative: None, @@ -1321,6 +1403,7 @@ impl Qwen3Executor { prefetch: HashMap::new(), l1_retention_disabled: false, flush_offload_on_finish: false, + vllm_compat: None, overlap: None, async_prefill: None, speculative: None, @@ -1500,6 +1583,13 @@ impl Qwen3Executor { if self.offload.is_none() { return; } + if self.vllm_compat.is_some() { + // The content domain is keyed with vLLM's hash scheme; this node's + // kvbm-keyed self-saves would be unfindable there. Remote blocks + // it fetched are already host-cached (under the vLLM keys) by + // pegaflow's read path, so multi-turn reuse doesn't need them. + return; + } let Some(rkv) = self.request_kvs.get(&request_id) else { return; }; @@ -1637,25 +1727,51 @@ impl Qwen3Executor { let PrefetchPhase::RemoteFetch { query_hashes, deadline, + miss_deadline, + parked_at, + last_query, } = &st.phase else { return false; }; - let timed_out = std::time::Instant::now() > *deadline; + let now = std::time::Instant::now(); + let timed_out = now > *deadline; if timed_out { log::warn!("remote KV fetch timed out for {id:?}; prefill from scratch"); } + if !timed_out && now.duration_since(*last_query) < REMOTE_REQUERY_INTERVAL { + return false; // stay parked; too soon for another MetaServer RPC + } + // The breaker cuts already-parked waiters short too: a request can + // enter this phase past the breaker via a transient Loading answer, + // and "the peer is evidently not publishing" applies to it as well. + let wait_on_miss = self + .vllm_compat + .as_ref() + .is_some_and(|c| c.consecutive_miss_windows < MISS_BREAKER_THRESHOLD) + && now <= *miss_deadline; + let miss_deadline = *miss_deadline; + let parked_for = now.duration_since(*parked_at); + let queried_blocks = query_hashes.len(); let query_hashes = query_hashes.clone(); + if let Some(st) = self.prefetch.get_mut(&id) + && let PrefetchPhase::RemoteFetch { last_query, .. } = &mut st.phase + { + *last_query = now; + } let available_blocks = self.kv_mgr.pool().available_blocks(); + let mut query_errored = false; let action = { let offload = self.offload.as_ref().expect("offload present in prefetch"); remote_fetch_action( timed_out, + wait_on_miss, || { offload .query(&id.0.to_string(), &query_hashes) .map(QueryView::from) .map_err(|e| { + query_errored = true; log::warn!( "remote KV re-query failed for {id:?} (prefill from scratch): {e}" ); @@ -1668,16 +1784,38 @@ impl Qwen3Executor { match action { RemoteFetchAction::Wait => false, RemoteFetchAction::Scratch => { + // A vLLM-compat request that waited out the whole miss window + // is the sole symptom of every P/D misconfiguration (seed, + // namespace, block size, peer down) — never degrade silently. + // The 15s hard timeout (a Loading-stuck peer) counts toward + // the breaker too, with its own warning already emitted. + // Requests cut short by an open breaker (now before the + // deadline) scratch quietly: the breaker warning already + // announced the mode. + let window_exhausted = self.vllm_compat.is_some() + && (timed_out || (!query_errored && now > miss_deadline)); + if window_exhausted { + if !timed_out { + log::warn!( + "expected remote KV never appeared for {id:?} \ + ({queried_blocks} blocks, waited {parked_for:?}); prefill from \ + scratch — check P/D seed/namespace/block-size alignment" + ); + } + self.note_miss_window_exhausted(); + } self.prefetch.remove(&id); true } RemoteFetchAction::Release(lease) => { + self.note_remote_hit(); let offload = self.offload.as_ref().expect("offload present in prefetch"); offload.release_query_lease(lease); self.prefetch.remove(&id); true } RemoteFetchAction::Load(lease, num_blocks) => { + self.note_remote_hit(); let probe = self .prefetch .remove(&id) @@ -1691,6 +1829,31 @@ impl Qwen3Executor { } } + /// Miss-breaker bookkeeping: one more request exhausted the whole + /// zero-hit wait window. At the threshold the breaker opens and + /// [`Self::begin_kv_prefetch`] stops parking new requests. + fn note_miss_window_exhausted(&mut self) { + let Some(compat) = self.vllm_compat.as_mut() else { + return; + }; + compat.consecutive_miss_windows = compat.consecutive_miss_windows.saturating_add(1); + if compat.consecutive_miss_windows == MISS_BREAKER_THRESHOLD { + log::warn!( + "P/D miss breaker open: {MISS_BREAKER_THRESHOLD} consecutive requests \ + exhausted the remote-KV wait window; new requests prefill from scratch \ + immediately until a remote hit lands" + ); + } + } + + /// Miss-breaker bookkeeping: remote content is visible again (leased + /// hit), so cold requests may wait on the P/D handoff race once more. + fn note_remote_hit(&mut self) { + if let Some(compat) = self.vllm_compat.as_mut() { + compat.consecutive_miss_windows = 0; + } + } + /// Reserve GPU destination blocks for a leased host-tier hit, submit the /// H2D load, and park the request as a `Loading` prefetch (taking ownership /// of `probe`, which keeps the GPU-hit prefix resident meanwhile). @@ -1823,16 +1986,23 @@ fn build_offload( // mixed mesh silently feed one model the other's KV. hidden_size + // intermediate_size + vocab_size discriminate the model line's sizes; // the layout fields pin the block geometry the transfer relies on. - let namespace = format!( - "openinfer-qwen3-hs{}-is{}-v{}-l{}h{}d{}p{}", - config.hidden_size, - config.intermediate_size, - config.vocab_size, - layout.num_layers, - layout.num_kv_heads, - layout.head_dim, - layout.page_size - ); + // vLLM-compat mode joins the *P side's* content domain instead: the + // pegaflow connector derives an 8-hex namespace from vLLM config (and logs + // it at startup); reproducing that derivation would mean chasing Python + // repr of vLLM internals, so the operator passes it through explicitly. + let namespace = match &opts.vllm_compat { + Some(compat) => compat.namespace.clone(), + None => format!( + "openinfer-qwen3-hs{}-is{}-v{}-l{}h{}d{}p{}", + config.hidden_size, + config.intermediate_size, + config.vocab_size, + layout.num_layers, + layout.num_kv_heads, + layout.head_dim, + layout.page_size + ), + }; let mut config = OffloadConfig::new( format!("qwen3-dev{device_id}"), device_id, @@ -2008,17 +2178,45 @@ impl ModelExecutor for Qwen3Executor { // keep their reserved blocks, so this never touches live KV. self.kv_mgr.pool().evict_inactive(); } + if self.vllm_compat.is_some() && lora_adapter.is_some() { + // vLLM salts LoRA block hashes via extra_keys; that derivation is + // not replicated, so LoRA requests skip the cross-engine lookup. + return false; + } let probe = self .kv_mgr .pool() .probe_prefix(prompt_tokens.to_vec(), lora_adapter); - let query_hashes = probe.cpu_query_hashes(); + let query_hashes = match &self.vllm_compat { + None => probe.cpu_query_hashes(), + // Same query window ([gpu_hit .. cacheable) blocks of the prompt), + // keyed with vLLM's hash scheme so the lookup can find what the + // vLLM prefill peer registered. Local GPU-tier naming stays kvbm: + // the loaded bytes are committed under the probe's own hashes. + Some(compat) => { + let window = probe.cpu_query_window(); + let start = probe.gpu_hit_blocks(); + // In bounds by construction: the probe's reuse cap leaves the + // prompt's final token out, so start + window ≤ ⌊len/bs⌋ = + // chain.len() even for block-aligned prompts. + let chain = compat.hasher.key_chain(prompt_tokens); + chain[start..start + window].to_vec() + } + }; if query_hashes.is_empty() { return false; } + // Breaker open: the peer demonstrably isn't publishing, so treat a + // zero hit as a plain miss instead of parking for the whole window. + // The first-shot query below still runs — a hit re-arms waiting. + let expect_remote = self + .vllm_compat + .as_ref() + .is_some_and(|c| c.consecutive_miss_windows < MISS_BREAKER_THRESHOLD); let available_blocks = self.kv_mgr.pool().available_blocks(); let action = remote_fetch_action( false, + expect_remote, || { offload .query(&request_id.0.to_string(), &query_hashes) @@ -2033,15 +2231,25 @@ impl ModelExecutor for Qwen3Executor { match action { RemoteFetchAction::Wait => { // pegaflow is pulling the missing prefix from a P2P peer (or - // SSD) into the local host tier. Park the request and re-query - // each tick; the probe keeps the GPU-hit prefix resident. + // SSD) into the local host tier — or, in vLLM-compat mode, the + // producer's registration hasn't landed yet. Park the request + // and re-query each tick; the probe keeps the GPU-hit prefix + // resident. + let now = std::time::Instant::now(); + let miss_wait = self + .vllm_compat + .as_ref() + .map_or(std::time::Duration::ZERO, |c| c.miss_wait); self.prefetch.insert( request_id, PrefetchState { probe, phase: PrefetchPhase::RemoteFetch { query_hashes, - deadline: std::time::Instant::now() + REMOTE_FETCH_DEADLINE, + deadline: now + REMOTE_FETCH_DEADLINE, + miss_deadline: now + miss_wait, + parked_at: now, + last_query: now, }, }, ); @@ -2049,10 +2257,13 @@ impl ModelExecutor for Qwen3Executor { } RemoteFetchAction::Scratch => false, // miss or query error RemoteFetchAction::Release(lease) => { + self.note_remote_hit(); + let offload = self.offload.as_ref().expect("offload checked above"); offload.release_query_lease(lease); false } RemoteFetchAction::Load(lease, num_blocks) => { + self.note_remote_hit(); match self.start_prefetch_load(request_id, probe, lease, num_blocks) { Ok(()) => true, Err(()) => false, diff --git a/openinfer-qwen3/src/executor/remote_fetch.rs b/openinfer-qwen3/src/executor/remote_fetch.rs index 9151d863..80c54b9e 100644 --- a/openinfer-qwen3/src/executor/remote_fetch.rs +++ b/openinfer-qwen3/src/executor/remote_fetch.rs @@ -61,8 +61,17 @@ pub(super) enum RemoteFetchAction { /// this takes a closure rather than a pre-computed outcome. A query `Err` is /// already logged at the call site's level of context, so it folds into /// [`RemoteFetchAction::Scratch`] here. +/// +/// `wait_on_miss` is the P/D handoff race guard: when the prefill peer's KV +/// is *expected* (vLLM-prefill mode), a zero-hit means "the producer's save +/// or MetaServer registration hasn't landed yet", not "nobody has it" — so +/// the request stays parked and re-queries instead of prefilling from +/// scratch. The caller bounds it with a miss deadline (passing `false` once +/// that window closes); a query error still folds to `Scratch` — a broken +/// local engine won't heal by waiting. pub(super) fn remote_fetch_action( timed_out: bool, + wait_on_miss: bool, query: impl FnOnce() -> Result, E>, available_blocks: usize, reserve_floor: usize, @@ -76,7 +85,11 @@ pub(super) fn remote_fetch_action( Err(_) => return RemoteFetchAction::Scratch, }; let Some(lease) = lease else { - return RemoteFetchAction::Scratch; // miss + return if wait_on_miss { + RemoteFetchAction::Wait // producer's registration not visible yet + } else { + RemoteFetchAction::Scratch // miss + }; }; // Blocks promised to admitted requests are off-limits: reserving into // them makes a later prefill chunk or decode growth fail allocation. @@ -102,6 +115,7 @@ mod tests { fn timeout_gives_up_without_querying() { let action = remote_fetch_action::( true, + false, || unreachable!("post-deadline tick must not query"), usize::MAX, 0, @@ -112,7 +126,7 @@ mod tests { /// A remote fetch still in flight keeps the request parked. #[test] fn loading_waits() { - let action = remote_fetch_action::(false, || Ok(QueryView::Loading), 0, 0); + let action = remote_fetch_action::(false, false, || Ok(QueryView::Loading), 0, 0); assert_eq!(action, Action::Wait); } @@ -120,14 +134,14 @@ mod tests { /// the request just prefills from scratch. #[test] fn zero_hit_prefills_from_scratch() { - let action = remote_fetch_action(false, || ready(None, 0), usize::MAX, 0); + let action = remote_fetch_action(false, false, || ready(None, 0), usize::MAX, 0); assert_eq!(action, Action::Scratch); } /// A query error folds into prefill-from-scratch. #[test] fn query_error_prefills_from_scratch() { - let action = remote_fetch_action::(false, || Err("rpc failed"), usize::MAX, 0); + let action = remote_fetch_action::(false, false, || Err("rpc failed"), usize::MAX, 0); assert_eq!(action, Action::Scratch); } @@ -135,28 +149,66 @@ mod tests { /// for release — the type makes dropping it silently unrepresentable. #[test] fn budget_guard_releases_the_lease() { - let action = remote_fetch_action(false, || ready(Some(7), 10), 12, 3); + let action = remote_fetch_action(false, false, || ready(Some(7), 10), 12, 3); assert_eq!(action, Action::Release(7)); } /// `reserve_floor > available_blocks` must saturate, not underflow. #[test] fn budget_guard_saturates_when_floor_exceeds_available() { - let action = remote_fetch_action(false, || ready(Some(7), 1), 2, 5); + let action = remote_fetch_action(false, false, || ready(Some(7), 1), 2, 5); assert_eq!(action, Action::Release(7)); } /// Exactly-at-budget reserves and loads: the guard is strict-less-than. #[test] fn exact_budget_boundary_loads() { - let action = remote_fetch_action(false, || ready(Some(7), 9), 12, 3); + let action = remote_fetch_action(false, false, || ready(Some(7), 9), 12, 3); assert_eq!(action, Action::Load(7, 9)); } /// The normal path: leased hit within budget starts the H2D load. #[test] fn leased_hit_within_budget_loads() { - let action = remote_fetch_action(false, || ready(Some(42), 4), 64, 8); + let action = remote_fetch_action(false, false, || ready(Some(42), 4), 64, 8); assert_eq!(action, Action::Load(42, 4)); } + + /// P/D handoff race: a zero-hit with remote KV expected keeps the request + /// parked — the producer's registration hasn't landed yet. + #[test] + fn expected_remote_miss_waits() { + let action = remote_fetch_action(false, true, || ready(None, 0), usize::MAX, 0); + assert_eq!(action, Action::Wait); + } + + /// Once the miss window closes the caller passes `wait_on_miss = false` + /// and a still-missing prefix degrades to prefill-from-scratch. + #[test] + fn expected_remote_miss_window_closed_prefills_from_scratch() { + let action = remote_fetch_action(false, false, || ready(None, 0), usize::MAX, 0); + assert_eq!(action, Action::Scratch); + } + + /// `wait_on_miss` guards only the miss branch: a query error is a broken + /// local engine, not a registration race, and never waits. + #[test] + fn query_error_never_waits_even_when_remote_expected() { + let action = + remote_fetch_action::(false, true, || Err("rpc failed"), usize::MAX, 0); + assert_eq!(action, Action::Scratch); + } + + /// `wait_on_miss` does not bypass the hard deadline. + #[test] + fn timeout_overrides_wait_on_miss() { + let action = remote_fetch_action::( + true, + true, + || unreachable!("post-deadline tick must not query"), + usize::MAX, + 0, + ); + assert_eq!(action, Action::Scratch); + } } diff --git a/openinfer-qwen3/src/lib.rs b/openinfer-qwen3/src/lib.rs index cec8010e..754017d0 100644 --- a/openinfer-qwen3/src/lib.rs +++ b/openinfer-qwen3/src/lib.rs @@ -94,6 +94,10 @@ pub struct Qwen3OffloadOptions { /// MetaServer, peers pull missing prefixes over RDMA, and this engine /// serves theirs. The P/D disaggregation data plane. pub p2p: Option, + /// `Some` when the P/D prefill peer is vLLM (pegaflow connector): offload + /// query keys switch from kvbm lineage hashes to vLLM's prefix-cache hash + /// scheme so this decode node can find the blocks vLLM registered. + pub vllm_compat: Option, } /// Cross-instance P2P KV sharing (see `openinfer_kv_offload::P2pConfig`). @@ -115,6 +119,31 @@ pub struct Qwen3P2pOptions { pub flush_on_finish: bool, } +/// Decode-node settings for a P/D deployment whose prefill node is vLLM with +/// the pegaflow connector. vLLM registers KV under its own prefix-cache block +/// hashes (`xxh3_128` over canonical-CBOR chained tuples — see +/// `openinfer_kv_offload::VllmBlockHasher`); with this set, cold-request +/// offload queries derive those keys instead of kvbm lineage hashes, and a +/// zero hit waits out the producer's save/registration tail instead of +/// immediately prefilling from scratch. +/// +/// Requires on every vLLM prefill process: `--prefix-caching-hash-algo +/// xxhash_cbor` and `PYTHONHASHSEED` set to `python_hash_seed` (unset, vLLM's +/// chain root is `os.urandom` — unreproducible across processes). +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct Qwen3VllmCompatOptions { + /// The `PYTHONHASHSEED` value shared with every vLLM prefill process. + pub python_hash_seed: String, + /// The P side's pegaflow-connector namespace: an 8-hex digest the + /// connector derives from vLLM config and logs at startup + /// (`namespace=...`). Both sides must address the same content domain. + pub namespace: String, + /// How long a cold request keeps re-querying a zero hit before giving up + /// on the expected remote KV and prefilling locally. Covers the P side's + /// post-response save + MetaServer-registration tail (tens of ms). + pub miss_wait: std::time::Duration, +} + impl Qwen3OffloadOptions { /// 8 GiB host tier — a few thousand dense Qwen3-4B blocks. pub const DEFAULT_PINNED_POOL_BYTES: usize = 8 << 30; @@ -124,6 +153,7 @@ impl Qwen3OffloadOptions { enabled: false, pinned_pool_bytes: 0, p2p: None, + vllm_compat: None, } } @@ -132,6 +162,7 @@ impl Qwen3OffloadOptions { enabled: true, pinned_pool_bytes, p2p: None, + vllm_compat: None, } } @@ -140,6 +171,12 @@ impl Qwen3OffloadOptions { self.p2p = Some(p2p); self } + + #[must_use] + pub fn with_vllm_compat(mut self, compat: Qwen3VllmCompatOptions) -> Self { + self.vllm_compat = Some(compat); + self + } } impl Default for Qwen3OffloadOptions { diff --git a/openinfer-server/src/config.rs b/openinfer-server/src/config.rs index 44220c23..6de6784f 100644 --- a/openinfer-server/src/config.rs +++ b/openinfer-server/src/config.rs @@ -109,6 +109,30 @@ pub(crate) struct Args { #[arg(long, default_value_t = false, requires = "kv_p2p_metaserver_addr")] pub kv_p2p_flush_on_finish: bool, + /// P/D decode role with a vLLM prefill peer: the shared PYTHONHASHSEED + /// value set on every vLLM prefill process. Switches offload query keys to + /// vLLM's prefix-cache hash scheme (requires the P side to run + /// --prefix-caching-hash-algo xxhash_cbor) and makes a cold request wait + /// out the producer's registration tail instead of prefilling locally. + /// Requires --kv-pd-vllm-namespace and the P2P mesh flags. + #[arg(long, value_parser = parse_pythonhashseed, requires_all = ["kv_p2p_metaserver_addr", "kv_pd_vllm_namespace"])] + pub kv_pd_vllm_seed: Option, + + /// The vLLM prefill peer's pegaflow-connector namespace (an 8-hex digest + /// the connector logs at startup as `namespace=...`). Both sides must + /// address the same content domain. The digest carries no model identity: + /// pointing a decode node at a different model's namespace (same + /// tokenizer, same geometry class) silently cross-loads foreign KV. + #[arg(long, value_parser = parse_pegaflow_namespace, requires = "kv_pd_vllm_seed")] + pub kv_pd_vllm_namespace: Option, + + /// Zero-hit wait window for --kv-pd-vllm-seed mode, in milliseconds: how + /// long a cold request keeps re-querying before giving up on the expected + /// remote KV and prefilling locally. Must stay below the executor's 15s + /// remote-fetch deadline (enforced at engine startup). + #[arg(long, default_value_t = 5000, requires = "kv_pd_vllm_seed")] + pub kv_pd_miss_wait_ms: u64, + /// vLLM-style no-prefix-cache. Without --kv-offload it disables prefix /// matching outright (every prefill recomputes the full prompt). With /// --kv-offload it is the pure-L2 mode: no cross-request HBM reuse, so every @@ -311,6 +335,26 @@ pub(crate) fn parse_max_lora_rank_arg(value: &str) -> Result { } } +/// PYTHONHASHSEED as vLLM accepts it: a decimal integer in [0, 4294967295]. +/// An empty or malformed seed would derive a well-formed key space that can +/// never match the peer — a config error must fail here, not as slow requests. +fn parse_pythonhashseed(s: &str) -> Result { + if s.parse::().is_err() || s.starts_with('+') { + return Err(format!( + "PYTHONHASHSEED must be a decimal integer in [0, 4294967295], got {s:?}" + )); + } + Ok(s.to_string()) +} + +/// A pegaflow namespace digest: exactly 8 lowercase hex chars. +fn parse_pegaflow_namespace(s: &str) -> Result { + if s.len() != 8 || !s.bytes().all(|b| b.is_ascii_hexdigit() && !b.is_ascii_uppercase()) { + return Err(format!("namespace must be an 8-char lowercase hex digest, got {s:?}")); + } + Ok(s.to_string()) +} + fn parse_lora_module_fields(name: &str, path: &str) -> Result { if name.is_empty() { return Err("--lora-modules name must not be empty".to_string()); diff --git a/openinfer-server/src/main.rs b/openinfer-server/src/main.rs index 5f7e6260..e9e5671b 100644 --- a/openinfer-server/src/main.rs +++ b/openinfer-server/src/main.rs @@ -168,6 +168,19 @@ fn load_engine(args: &Args, model_type: ModelType) -> anyhow::Result