diff --git a/ucm/integration/vllm/patch/0.11.0/vllm-adapt-load-failure.patch b/ucm/integration/vllm/patch/0.11.0/vllm-adapt-load-failure.patch new file mode 100644 index 000000000..b102fc254 --- /dev/null +++ b/ucm/integration/vllm/patch/0.11.0/vllm-adapt-load-failure.patch @@ -0,0 +1,478 @@ +From de6002da3f694496e6d527e22665a4bfebe6042c Mon Sep 17 00:00:00 2001 +From: fenghao +Date: Sat, 31 Jan 2026 01:13:17 -0800 +Subject: [PATCH] [Patch] UCM patch for vllm 0.11.0 with load failure + +--- + .../kv_transfer/kv_connector/utils.py | 3 + + .../kv_transfer/kv_connector/v1/base.py | 20 ++ + vllm/v1/core/block_pool.py | 2 +- + vllm/v1/core/sched/output.py | 2 + + vllm/v1/core/sched/scheduler.py | 195 ++++++++++++++++-- + vllm/v1/core/single_type_kv_cache_manager.py | 3 + + vllm/v1/outputs.py | 5 +- + vllm/v1/worker/gpu_model_runner.py | 14 +- + vllm/v1/worker/gpu_worker.py | 3 +- + .../worker/kv_connector_model_runner_mixin.py | 1 + + 10 files changed, 230 insertions(+), 18 deletions(-) + +diff --git a/vllm/distributed/kv_transfer/kv_connector/utils.py b/vllm/distributed/kv_transfer/kv_connector/utils.py +index efa4c9abf..4bef1bd23 100644 +--- a/vllm/distributed/kv_transfer/kv_connector/utils.py ++++ b/vllm/distributed/kv_transfer/kv_connector/utils.py +@@ -143,6 +143,7 @@ class KVOutputAggregator: + finished_sending = set[str]() + finished_recving = set[str]() + aggregated_kv_connector_stats = None ++ invalid_block_ids = set[int]() + for model_runner_output in outputs: + output = model_runner_output.kv_connector_output + if not output: +@@ -164,6 +165,7 @@ class KVOutputAggregator: + type(kv_connector_stats)) + aggregated_kv_connector_stats = \ + aggregated_kv_connector_stats.aggregate(kv_connector_stats) ++ invalid_block_ids |= output.invalid_block_ids + + # select output of the worker specified by output_rank + output = outputs[output_rank] +@@ -172,6 +174,7 @@ class KVOutputAggregator: + finished_sending=finished_sending or None, + finished_recving=finished_recving or None, + kv_connector_stats=aggregated_kv_connector_stats or None, ++ invalid_block_ids=invalid_block_ids, + ) + + return output +diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/base.py b/vllm/distributed/kv_transfer/kv_connector/v1/base.py +index 184d0a62f..0d71e4cc1 100644 +--- a/vllm/distributed/kv_transfer/kv_connector/v1/base.py ++++ b/vllm/distributed/kv_transfer/kv_connector/v1/base.py +@@ -229,6 +229,26 @@ class KVConnectorBase_V1(ABC): + """ + return None, None + ++ def get_block_ids_with_load_errors(self) -> set[int]: ++ """ ++ Get the set of block IDs that failed to load. ++ ++ Returns: ++ Set of block IDs that encountered load errors. ++ Empty set if no load errors occurred. ++ ++ Notes: ++ - Applies to both sync- and async-loading requests. ++ - Async loading: failed blocks may be reported in any forward pass ++ up to and including the pass where the request ID is returned by ++ `get_finished()`. Even if failures occur, the request must still ++ be reported via `get_finished()`, and the failed block IDs must ++ appear here no later than that same pass. ++ - Sync loading: failed blocks should be reported in the forward ++ pass in which they are detected. ++ """ ++ return set() ++ + def shutdown(self): + """ + Shutdown the connector. This is called when the worker process +diff --git a/vllm/v1/core/block_pool.py b/vllm/v1/core/block_pool.py +index 3cc738304..617a724a1 100644 +--- a/vllm/v1/core/block_pool.py ++++ b/vllm/v1/core/block_pool.py +@@ -211,7 +211,7 @@ class BlockPool: + block_size: Number of tokens in each block. + kv_cache_group_id: The id of the KV cache group. + """ +- if num_cached_blocks == num_full_blocks: ++ if num_cached_blocks >= num_full_blocks: + return + new_full_blocks = blocks[num_cached_blocks:num_full_blocks] + assert len(request.block_hashes) >= num_full_blocks +diff --git a/vllm/v1/core/sched/output.py b/vllm/v1/core/sched/output.py +index 209fc2a44..6874e713a 100644 +--- a/vllm/v1/core/sched/output.py ++++ b/vllm/v1/core/sched/output.py +@@ -101,6 +101,7 @@ class CachedRequestData: + new_token_ids: list[list[int]] + new_block_ids: list[Optional[tuple[list[int], ...]]] + num_computed_tokens: list[int] ++ num_output_tokens: list[int] + + @property + def num_reqs(self) -> int: +@@ -114,6 +115,7 @@ class CachedRequestData: + new_token_ids=[], + new_block_ids=[], + num_computed_tokens=[], ++ num_output_tokens=[], + ) + + +diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py +index 2b2cd63c2..ed560bfb1 100644 +--- a/vllm/v1/core/sched/scheduler.py ++++ b/vllm/v1/core/sched/scheduler.py +@@ -133,6 +133,7 @@ class Scheduler(SchedulerInterface): + + # KV Connector: requests in process of async KV loading or recving + self.finished_recving_kv_req_ids: set[str] = set() ++ self.failed_recving_kv_req_ids: set[str] = set() + + # Encoder-related. + # Calculate encoder cache size if applicable +@@ -669,6 +670,7 @@ class Scheduler(SchedulerInterface): + new_token_ids: list[list[int]] = [] + new_block_ids: list[Optional[tuple[list[int], ...]]] = [] + num_computed_tokens: list[int] = [] ++ num_output_tokens: list[int] = [] + + use_connector = self.connector is not None + for req in itertools.chain(running_reqs, resumed_reqs): +@@ -693,6 +695,7 @@ class Scheduler(SchedulerInterface): + new_block_ids.append( + req_to_new_blocks[req_id].get_block_ids(allow_none=True)) + num_computed_tokens.append(req.num_computed_tokens) ++ num_output_tokens.append(len(req.output_token_ids)) + # Because resumed_reqs is usually empty, it is more efficient to do + # in-place appending so that we don't need to allocate a new list. + resumed_from_preemption = [False] * len(running_reqs) +@@ -704,6 +707,7 @@ class Scheduler(SchedulerInterface): + new_token_ids=new_token_ids, + new_block_ids=new_block_ids, + num_computed_tokens=num_computed_tokens, ++ num_output_tokens=num_output_tokens, + ) + + def _try_schedule_encoder_inputs( +@@ -875,6 +879,15 @@ class Scheduler(SchedulerInterface): + spec_decoding_stats: Optional[SpecDecodingStats] = None + kv_connector_stats = (kv_connector_output.kv_connector_stats + if kv_connector_output else None) ++ ++ failed_kv_load_req_ids = None ++ if kv_connector_output and kv_connector_output.invalid_block_ids: ++ # These blocks contain externally computed tokens that failed to ++ # load. Identify affected requests and adjust their computed token ++ # count to trigger recomputation of the invalid blocks. ++ failed_kv_load_req_ids = self._handle_invalid_blocks( ++ kv_connector_output.invalid_block_ids ++ ) + + # NOTE(woosuk): As len(num_scheduled_tokens) can be up to 1K or more, + # the below loop can be a performance bottleneck. We should do our best +@@ -883,6 +896,9 @@ class Scheduler(SchedulerInterface): + stopped_preempted_reqs: set[Request] = set() + for req_id, num_tokens_scheduled in num_scheduled_tokens.items(): + assert num_tokens_scheduled > 0 ++ if failed_kv_load_req_ids and req_id in failed_kv_load_req_ids: ++ # Skip requests that were recovered from KV load failure ++ continue + request = self.requests.get(req_id) + if request is None: + # The request is already finished. This can happen if the +@@ -905,7 +921,10 @@ class Scheduler(SchedulerInterface): + # tokens and rejections. If some tokens are rejected, + # num_computed_tokens is decreased by the number of rejected + # tokens. +- request.num_computed_tokens -= num_rejected ++ if request.num_computed_tokens > 0: ++ request.num_computed_tokens -= num_rejected ++ if request.num_output_placeholders > 0: ++ request.num_output_placeholders -= num_rejected + spec_decoding_stats = self.make_spec_decoding_stats( + spec_decoding_stats, + num_draft_tokens=num_draft_tokens, +@@ -1250,18 +1269,31 @@ class Scheduler(SchedulerInterface): + if request.request_id not in self.finished_recving_kv_req_ids: + return False + +- # Now that the blocks are ready, actually cache them. +- (block_ids, ) = self.kv_cache_manager.get_block_ids(request.request_id) +- num_computed_tokens = len(block_ids) * self.block_size +- # Handle the case where num request tokens less than one block. +- num_computed_tokens = min(num_computed_tokens, request.num_tokens) +- if num_computed_tokens == request.num_tokens: +- num_computed_tokens -= 1 +- # This will cache the blocks iff caching is enabled. +- self.kv_cache_manager.cache_blocks(request, num_computed_tokens) ++ if request.request_id in self.failed_recving_kv_req_ids: ++ # Request had KV load failures; num_computed_tokens was already ++ # updated in _update_requests_with_invalid_blocks ++ if request.num_computed_tokens: ++ # Cache any valid computed tokens. ++ self.kv_cache_manager.cache_blocks(request, request.num_computed_tokens) ++ else: ++ # No valid computed tokens, release allocated blocks. ++ # There may be a local cache hit on retry. ++ self.kv_cache_manager.free(request) + +- # Update the request state for scheduling. +- request.num_computed_tokens = num_computed_tokens ++ self.failed_recving_kv_req_ids.remove(request.request_id) ++ else: ++ # Now that the blocks are ready, actually cache them. ++ (block_ids,) = self.kv_cache_manager.get_block_ids(request.request_id) ++ num_computed_tokens = len(block_ids) * self.block_size ++ # Handle the case where num request tokens less than one block. ++ num_computed_tokens = min(num_computed_tokens, request.num_tokens) ++ if num_computed_tokens == request.num_tokens: ++ num_computed_tokens -= 1 ++ # This will cache the blocks iff caching is enabled. ++ self.kv_cache_manager.cache_blocks(request, num_computed_tokens) ++ ++ # Update the request state for scheduling. ++ request.num_computed_tokens = num_computed_tokens + + # Return that we are ready. + self.finished_recving_kv_req_ids.remove(request.request_id) +@@ -1294,3 +1326,142 @@ class Scheduler(SchedulerInterface): + "but the request is already freed.", req_id) + else: + self._free_blocks(self.requests[req_id]) ++ ++ def _update_requests_with_invalid_blocks( ++ self, requests: Iterable[Request], invalid_block_ids: set[int] ++ ) -> tuple[set[str], int]: ++ """ ++ Identify and update requests affected by invalid KV cache blocks. ++ ++ This method scans the given requests, detects those with invalid blocks ++ and adjusts their `num_computed_tokens` to the longest valid prefix. ++ For observability, it also accumulates the total number of tokens that ++ will need to be recomputed across all affected requests. ++ ++ Args: ++ requests: The set of requests to scan for invalid blocks. ++ invalid_block_ids: IDs of invalid blocks. ++ ++ Returns: ++ tuple: ++ - affected_req_ids (set[str]): IDs of requests impacted by ++ invalid blocks. ++ - total_affected_tokens (int): Total number of tokens that must ++ be recomputed across all affected requests (for observability). ++ """ ++ affected_req_ids: set[str] = set() ++ total_affected_tokens = 0 ++ # If a block is invalid and shared by multiple requests in the batch, ++ # these requests must be rescheduled, but only the first will recompute ++ # it. This set tracks blocks already marked for recomputation. ++ marked_invalid_block_ids: set[int] = set() ++ for request in requests: ++ is_affected = False ++ marked_invalid_block = False ++ req_id = request.request_id ++ # TODO (davidb): add support for hybrid memory allocator ++ (req_block_ids,) = self.kv_cache_manager.get_block_ids(req_id) ++ # We iterate only over blocks that may contain externally computed ++ # tokens ++ if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS: ++ # Async loading. If num_computed_tokens is set it implies we ++ # already processed some block failures for it in a prior step ++ req_num_computed_tokens = ( ++ request.num_computed_tokens ++ if req_id in self.failed_recving_kv_req_ids ++ else len(req_block_ids) * self.block_size ++ ) ++ else: ++ # Sync loading. num_computed_tokens includes new tokens ++ req_num_computed_tokens = request.num_cached_tokens ++ ++ req_num_computed_blocks = ( ++ req_num_computed_tokens + self.block_size - 1 ++ ) // self.block_size ++ for idx, block_id in zip(range(req_num_computed_blocks), req_block_ids): ++ if block_id not in invalid_block_ids: ++ continue ++ ++ is_affected = True ++ ++ if block_id in marked_invalid_block_ids: ++ # This invalid block is shared with a previous request ++ # and was already marked for recomputation. ++ # This means this request can still consider this block ++ # as computed when rescheduled. ++ # Currently this only applies to sync loading; Async ++ # loading does not yet support block sharing ++ continue ++ ++ marked_invalid_block_ids.add(block_id) ++ ++ if marked_invalid_block: ++ # This request has already marked an invalid block for ++ # recomputation and updated its num_computed_tokens. ++ continue ++ ++ marked_invalid_block = True ++ # Truncate the computed tokens at the first failed block ++ request.num_computed_tokens = idx * self.block_size ++ total_affected_tokens += ( ++ req_num_computed_tokens - request.num_computed_tokens ++ ) ++ ++ if is_affected: ++ if not marked_invalid_block: ++ # All invalid blocks of this request are shared with ++ # previous requests and will be recomputed by them. ++ # Revert to considering only cached tokens as computed. ++ # Currently this only applies to sync loading; Async ++ # loading does not yet support block sharing ++ total_affected_tokens += ( ++ request.num_computed_tokens - request.num_cached_tokens ++ ) ++ request.num_computed_tokens = request.num_cached_tokens ++ ++ affected_req_ids.add(request.request_id) ++ ++ return affected_req_ids, total_affected_tokens ++ ++ def _handle_invalid_blocks(self, invalid_block_ids: set[int]) -> set[str]: ++ total_requests_to_reschedule = 0 ++ total_tokens_to_reschedule = 0 ++ ++ # --- Handle async KV loads (WAITING_FOR_REMOTE_KVS) --- ++ async_load_reqs = ( ++ req ++ for req in self.waiting ++ if req.status == RequestStatus.WAITING_FOR_REMOTE_KVS ++ ) ++ async_affected_req_ids, num_tokens_to_reschedule = ( ++ self._update_requests_with_invalid_blocks( ++ async_load_reqs, invalid_block_ids ++ ) ++ ) ++ ++ total_requests_to_reschedule += len(async_affected_req_ids) ++ total_tokens_to_reschedule += num_tokens_to_reschedule ++ ++ # Mark requests with async KV load failures; they will be rescheduled ++ # once loading completes. ++ self.failed_recving_kv_req_ids |= async_affected_req_ids ++ ++ # --- Handle sync KV loads (running requests) --- ++ sync_affected_req_ids, num_tokens_to_reschedule = ( ++ self._update_requests_with_invalid_blocks(self.running, invalid_block_ids) ++ ) ++ ++ total_requests_to_reschedule += len(sync_affected_req_ids) ++ total_tokens_to_reschedule += num_tokens_to_reschedule ++ ++ if total_requests_to_reschedule: ++ logger.warning( ++ "Recovered from KV load failure: " ++ "%d request(s) rescheduled (%d tokens affected).", ++ total_requests_to_reschedule, ++ total_tokens_to_reschedule, ++ ) ++ ++ # Return the IDs of affected running requests to skip in ++ # update_from_output. ++ return sync_affected_req_ids +diff --git a/vllm/v1/core/single_type_kv_cache_manager.py b/vllm/v1/core/single_type_kv_cache_manager.py +index e889f7804..4ecd9c815 100644 +--- a/vllm/v1/core/single_type_kv_cache_manager.py ++++ b/vllm/v1/core/single_type_kv_cache_manager.py +@@ -142,6 +142,9 @@ class SingleTypeKVCacheManager(ABC): + num_cached_blocks = self.num_cached_block[request.request_id] + num_full_blocks = num_tokens // self.block_size + ++ if num_cached_blocks >= num_full_blocks: ++ return ++ + self.block_pool.cache_full_blocks( + request=request, + blocks=self.req_to_blocks[request.request_id], +diff --git a/vllm/v1/outputs.py b/vllm/v1/outputs.py +index 01f3676ab..efc16b8b6 100644 +--- a/vllm/v1/outputs.py ++++ b/vllm/v1/outputs.py +@@ -2,7 +2,7 @@ + # SPDX-FileCopyrightText: Copyright contributors to the vLLM project + + from abc import ABC, abstractmethod +-from dataclasses import dataclass ++from dataclasses import dataclass, field + from typing import TYPE_CHECKING, NamedTuple, Optional, Union + + import torch +@@ -87,10 +87,11 @@ class KVConnectorOutput: + finished_sending: Optional[set[str]] = None + finished_recving: Optional[set[str]] = None + kv_connector_stats: Optional["KVConnectorStats"] = None ++ invalid_block_ids: set[int] = field(default_factory=set) + + def is_empty(self): + return (not self.finished_sending and not self.finished_recving +- and not self.kv_connector_stats) ++ and not self.kv_connector_stats and not self.invalid_block_ids) + + + # ModelRunnerOutput is serialized and sent to the scheduler process. +diff --git a/vllm/v1/worker/gpu_model_runner.py b/vllm/v1/worker/gpu_model_runner.py +index a438c7777..1518b0827 100644 +--- a/vllm/v1/worker/gpu_model_runner.py ++++ b/vllm/v1/worker/gpu_model_runner.py +@@ -616,6 +616,8 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): + num_computed_tokens = req_data.num_computed_tokens[i] + new_block_ids = req_data.new_block_ids[i] + resumed_from_preemption = req_data.resumed_from_preemption[i] ++ num_output_tokens = req_data.num_output_tokens[i] ++ req_index = self.input_batch.req_id_to_index.get(req_id) + + # Update the cached states. + req_state.num_computed_tokens = num_computed_tokens +@@ -635,6 +637,17 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): + elif num_new_tokens > 0: + req_state.output_token_ids.extend( + new_token_ids[-num_new_tokens:]) ++ elif num_output_tokens < len(req_state.output_token_ids): ++ # Some output tokens were discarded due to a sync-KV-load ++ # failure. Align the cached state. ++ del req_state.output_token_ids[num_output_tokens:] ++ if req_index is not None: ++ end_idx = ( ++ self.input_batch.num_prompt_tokens[req_index] ++ + num_output_tokens ++ ) ++ self.input_batch.num_tokens[req_index] = end_idx ++ self.input_batch.num_tokens_no_spec[req_index] = end_idx + + # Update the block IDs. + if not resumed_from_preemption: +@@ -649,7 +662,6 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): + # Replace the existing block IDs with the new ones. + req_state.block_ids = new_block_ids + +- req_index = self.input_batch.req_id_to_index.get(req_id) + if req_index is None: + # The request is not in the persistent batch. + # The request was either preempted and resumed later, or was not +diff --git a/vllm/v1/worker/gpu_worker.py b/vllm/v1/worker/gpu_worker.py +index 8c75e8914..a135a594a 100644 +--- a/vllm/v1/worker/gpu_worker.py ++++ b/vllm/v1/worker/gpu_worker.py +@@ -464,8 +464,7 @@ class Worker(WorkerBase): + + # In case of PP with kv transfer, we need to pass through the + # kv_connector_output +- if (not kv_connector_output.finished_sending +- and not kv_connector_output.finished_recving): ++ if kv_connector_output.is_empty(): + return EMPTY_MODEL_RUNNER_OUTPUT + + output = copy.copy(EMPTY_MODEL_RUNNER_OUTPUT) +diff --git a/vllm/v1/worker/kv_connector_model_runner_mixin.py b/vllm/v1/worker/kv_connector_model_runner_mixin.py +index 7eaff924e..956bb0145 100644 +--- a/vllm/v1/worker/kv_connector_model_runner_mixin.py ++++ b/vllm/v1/worker/kv_connector_model_runner_mixin.py +@@ -120,6 +120,7 @@ class KVConnectorModelRunnerMixin: + + output.finished_sending, output.finished_recving = ( + kv_connector.get_finished(scheduler_output.finished_req_ids)) ++ output.invalid_block_ids = kv_connector.get_block_ids_with_load_errors() + + output.kv_connector_stats = KVConnectorModelRunnerMixin.\ + get_kv_connector_stats() +-- +2.34.1 + diff --git a/ucm/integration/vllm/patch/0.11.0/vllm-adapt-sparse.patch b/ucm/integration/vllm/patch/0.11.0/vllm-adapt.patch similarity index 59% rename from ucm/integration/vllm/patch/0.11.0/vllm-adapt-sparse.patch rename to ucm/integration/vllm/patch/0.11.0/vllm-adapt.patch index 0933c13f8..4e4c4b1c1 100644 --- a/ucm/integration/vllm/patch/0.11.0/vllm-adapt-sparse.patch +++ b/ucm/integration/vllm/patch/0.11.0/vllm-adapt.patch @@ -1,26 +1,33 @@ -From d886cedc2bf71d685dfe292102a26a5464b4f9c1 Mon Sep 17 00:00:00 2001 -From: AooooooA-C -Date: Thu, 8 Jan 2026 00:09:57 -0800 -Subject: [PATCH] apply sparse method patches +From 475ef2fa3d478b8bd8b8b31d114b411c8c11555e Mon Sep 17 00:00:00 2001 +From: fenghao +Date: Sat, 31 Jan 2026 00:20:00 -0800 +Subject: [PATCH] [Patch] UCM patch for vllm 0.11.0 with sparse and load + failure --- - vllm/attention/layer.py | 65 +++++++++++++++- - vllm/model_executor/models/llama.py | 24 ++++++ - vllm/model_executor/models/qwen2.py | 24 ++++++ - vllm/v1/attention/backends/flash_attn.py | 11 +++ - vllm/v1/attention/backends/mla/common.py | 21 ++++++ - vllm/v1/attention/backends/mla/flashmla.py | 18 +++++ - vllm/v1/core/kv_cache_manager.py | 10 ++- - vllm/v1/core/kv_cache_utils.py | 14 ++++ - vllm/v1/core/sched/output.py | 3 + - vllm/v1/core/sched/scheduler.py | 36 ++++++++- - vllm/v1/worker/block_table.py | 13 ++++ - vllm/v1/worker/gpu_model_runner.py | 87 +++++++++++++++++++--- - vllm/v1/worker/gpu_worker.py | 3 + - 13 files changed, 315 insertions(+), 14 deletions(-) + vllm/attention/layer.py | 63 +++++ + .../kv_transfer/kv_connector/utils.py | 3 + + .../kv_transfer/kv_connector/v1/base.py | 20 ++ + vllm/model_executor/models/llama.py | 24 ++ + vllm/model_executor/models/qwen2.py | 24 ++ + vllm/v1/attention/backends/flash_attn.py | 11 + + vllm/v1/attention/backends/mla/common.py | 21 ++ + vllm/v1/attention/backends/mla/flashmla.py | 18 ++ + vllm/v1/core/block_pool.py | 2 +- + vllm/v1/core/kv_cache_manager.py | 10 +- + vllm/v1/core/kv_cache_utils.py | 14 ++ + vllm/v1/core/sched/output.py | 5 + + vllm/v1/core/sched/scheduler.py | 231 +++++++++++++++++- + vllm/v1/core/single_type_kv_cache_manager.py | 3 + + vllm/v1/outputs.py | 5 +- + vllm/v1/worker/block_table.py | 13 + + vllm/v1/worker/gpu_model_runner.py | 101 +++++++- + vllm/v1/worker/gpu_worker.py | 6 +- + .../worker/kv_connector_model_runner_mixin.py | 1 + + 19 files changed, 544 insertions(+), 31 deletions(-) diff --git a/vllm/attention/layer.py b/vllm/attention/layer.py -index 79879b680..06fa41a4d 100644 +index 79879b680..c5324e47e 100644 --- a/vllm/attention/layer.py +++ b/vllm/attention/layer.py @@ -3,6 +3,7 @@ @@ -47,8 +54,7 @@ index 79879b680..06fa41a4d 100644 + query, key, value, _ = maybe_execute_sparse_attention_begin(query, key, value, layer_name, forward_context) output = self.impl.forward(self, query, key, value, kv_cache, attn_metadata) -- -+ + + maybe_execute_sparse_attention_finished(query, key, value, output, layer_name, forward_context) maybe_save_kv_layer_to_connector(layer_name, kv_cache) return output @@ -130,8 +136,67 @@ index 79879b680..06fa41a4d 100644 + + ucm_sparse.attention_finished(query, key, value, attn_output, layer_name, forward_context, phase) \ No newline at end of file +diff --git a/vllm/distributed/kv_transfer/kv_connector/utils.py b/vllm/distributed/kv_transfer/kv_connector/utils.py +index efa4c9abf..4bef1bd23 100644 +--- a/vllm/distributed/kv_transfer/kv_connector/utils.py ++++ b/vllm/distributed/kv_transfer/kv_connector/utils.py +@@ -143,6 +143,7 @@ class KVOutputAggregator: + finished_sending = set[str]() + finished_recving = set[str]() + aggregated_kv_connector_stats = None ++ invalid_block_ids = set[int]() + for model_runner_output in outputs: + output = model_runner_output.kv_connector_output + if not output: +@@ -164,6 +165,7 @@ class KVOutputAggregator: + type(kv_connector_stats)) + aggregated_kv_connector_stats = \ + aggregated_kv_connector_stats.aggregate(kv_connector_stats) ++ invalid_block_ids |= output.invalid_block_ids + + # select output of the worker specified by output_rank + output = outputs[output_rank] +@@ -172,6 +174,7 @@ class KVOutputAggregator: + finished_sending=finished_sending or None, + finished_recving=finished_recving or None, + kv_connector_stats=aggregated_kv_connector_stats or None, ++ invalid_block_ids=invalid_block_ids, + ) + + return output +diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/base.py b/vllm/distributed/kv_transfer/kv_connector/v1/base.py +index 184d0a62f..0d71e4cc1 100644 +--- a/vllm/distributed/kv_transfer/kv_connector/v1/base.py ++++ b/vllm/distributed/kv_transfer/kv_connector/v1/base.py +@@ -229,6 +229,26 @@ class KVConnectorBase_V1(ABC): + """ + return None, None + ++ def get_block_ids_with_load_errors(self) -> set[int]: ++ """ ++ Get the set of block IDs that failed to load. ++ ++ Returns: ++ Set of block IDs that encountered load errors. ++ Empty set if no load errors occurred. ++ ++ Notes: ++ - Applies to both sync- and async-loading requests. ++ - Async loading: failed blocks may be reported in any forward pass ++ up to and including the pass where the request ID is returned by ++ `get_finished()`. Even if failures occur, the request must still ++ be reported via `get_finished()`, and the failed block IDs must ++ appear here no later than that same pass. ++ - Sync loading: failed blocks should be reported in the forward ++ pass in which they are detected. ++ """ ++ return set() ++ + def shutdown(self): + """ + Shutdown the connector. This is called when the worker process diff --git a/vllm/model_executor/models/llama.py b/vllm/model_executor/models/llama.py -index c7dd134ea..db64f258c 100644 +index c7dd134ea..da5d725b0 100644 --- a/vllm/model_executor/models/llama.py +++ b/vllm/model_executor/models/llama.py @@ -56,6 +56,12 @@ from .utils import (AutoWeightsLoader, PPMissingLayer, extract_layer_index, @@ -188,7 +253,7 @@ index c7dd134ea..db64f258c 100644 return IntermediateTensors({ "hidden_states": hidden_states, diff --git a/vllm/model_executor/models/qwen2.py b/vllm/model_executor/models/qwen2.py -index c536b0f60..dc9460057 100644 +index c536b0f60..38a28fd1f 100644 --- a/vllm/model_executor/models/qwen2.py +++ b/vllm/model_executor/models/qwen2.py @@ -58,6 +58,12 @@ from .utils import (AutoWeightsLoader, PPMissingLayer, extract_layer_index, @@ -279,7 +344,7 @@ index f0770f744..717dc836e 100755 causal = common_attn_metadata.causal diff --git a/vllm/v1/attention/backends/mla/common.py b/vllm/v1/attention/backends/mla/common.py -index 963f1c5ab..c5667aadc 100755 +index 963f1c5ab..71f147ba1 100755 --- a/vllm/v1/attention/backends/mla/common.py +++ b/vllm/v1/attention/backends/mla/common.py @@ -194,6 +194,7 @@ from typing import Generic, Optional, TypeVar, Union @@ -338,7 +403,7 @@ index 963f1c5ab..c5667aadc 100755 if has_decode: assert attn_metadata.decode is not None decode_q_nope, decode_q_pe = decode_q.split( -@@ -1771,8 +1788,12 @@ class MLACommonImpl(MLACommonBaseImpl[M], Generic[M]): +@@ -1771,9 +1788,13 @@ class MLACommonImpl(MLACommonBaseImpl[M], Generic[M]): decode_q = get_dcp_group().all_gather(decode_q, dim=1) # call decode attn @@ -346,11 +411,12 @@ index 963f1c5ab..c5667aadc 100755 + attn_out, lse = self._forward_decode(decode_q, kv_cache, attn_metadata, layer) -+ -+ maybe_execute_sparse_attention_finished(torch.cat([decode_ql_nope, decode_q_pe],dim=-1), decode_ql_nope, decode_q_pe, output[:num_decode_tokens], layer.layer_name, forward_context, "decode") ++ maybe_execute_sparse_attention_finished(torch.cat([decode_ql_nope, decode_q_pe],dim=-1), decode_ql_nope, decode_q_pe, output[:num_decode_tokens], layer.layer_name, forward_context, "decode") ++ # recorect dcp attn_out with lse. if self.dcp_world_size > 1: + attn_out = cp_lse_ag_out_rs(attn_out, lse, get_dcp_group()) diff --git a/vllm/v1/attention/backends/mla/flashmla.py b/vllm/v1/attention/backends/mla/flashmla.py index 67c21f83c..43c26d333 100644 --- a/vllm/v1/attention/backends/mla/flashmla.py @@ -414,6 +480,19 @@ index 67c21f83c..43c26d333 100644 ) +diff --git a/vllm/v1/core/block_pool.py b/vllm/v1/core/block_pool.py +index 3cc738304..617a724a1 100644 +--- a/vllm/v1/core/block_pool.py ++++ b/vllm/v1/core/block_pool.py +@@ -211,7 +211,7 @@ class BlockPool: + block_size: Number of tokens in each block. + kv_cache_group_id: The id of the KV cache group. + """ +- if num_cached_blocks == num_full_blocks: ++ if num_cached_blocks >= num_full_blocks: + return + new_full_blocks = blocks[num_cached_blocks:num_full_blocks] + assert len(request.block_hashes) >= num_full_blocks diff --git a/vllm/v1/core/kv_cache_manager.py b/vllm/v1/core/kv_cache_manager.py index 401327f72..eb7acf00d 100644 --- a/vllm/v1/core/kv_cache_manager.py @@ -482,10 +561,26 @@ index 2ff1bb681..dca14b5d3 100644 kv_cache_tensors = [ KVCacheTensor(size=per_layer_specs[layer_name].page_size_bytes * diff --git a/vllm/v1/core/sched/output.py b/vllm/v1/core/sched/output.py -index 209fc2a44..4c7617ca6 100644 +index 209fc2a44..f81eb93ed 100644 --- a/vllm/v1/core/sched/output.py +++ b/vllm/v1/core/sched/output.py -@@ -164,3 +164,6 @@ class SchedulerOutput: +@@ -101,6 +101,7 @@ class CachedRequestData: + new_token_ids: list[list[int]] + new_block_ids: list[Optional[tuple[list[int], ...]]] + num_computed_tokens: list[int] ++ num_output_tokens: list[int] + + @property + def num_reqs(self) -> int: +@@ -114,6 +115,7 @@ class CachedRequestData: + new_token_ids=[], + new_block_ids=[], + num_computed_tokens=[], ++ num_output_tokens=[], + ) + + +@@ -164,3 +166,6 @@ class SchedulerOutput: # KV Cache Connector metadata. kv_connector_metadata: Optional[KVConnectorMetadata] = None @@ -493,7 +588,7 @@ index 209fc2a44..4c7617ca6 100644 + # modified slots by sparse algorithm + req_sparsed_slots: dict[str, int] = None diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py -index 2b2cd63c2..e42ed67b1 100644 +index 2b2cd63c2..6032f2fc6 100644 --- a/vllm/v1/core/sched/scheduler.py +++ b/vllm/v1/core/sched/scheduler.py @@ -36,6 +36,10 @@ from vllm.v1.outputs import DraftTokenIds, KVConnectorOutput, ModelRunnerOutput @@ -515,11 +610,10 @@ index 2b2cd63c2..e42ed67b1 100644 if self.vllm_config.kv_transfer_config is not None: assert len(self.kv_cache_config.kv_cache_groups) == 1, ( "Multiple KV cache groups are not currently supported " -@@ -91,6 +96,14 @@ class Scheduler(SchedulerInterface): - "with KV connectors") +@@ -92,6 +97,14 @@ class Scheduler(SchedulerInterface): self.connector = KVConnectorFactory.create_connector( config=self.vllm_config, role=KVConnectorRole.SCHEDULER) -+ + + # Initialize UCM Sparse if available + ucm_config = Config(self.vllm_config.kv_transfer_config) + ucm_sparse_config = ucm_config.get_config().get("ucm_sparse_config") @@ -527,10 +621,19 @@ index 2b2cd63c2..e42ed67b1 100644 + ensure_ucm_sparse_initialized(vllm_config, role=UcmSparseRole.SCHEDULER) + self.ucm_sparse = get_ucm_sparse() + logger.info("UCM Sparse initialized successfully: {}".format(self.ucm_sparse)) - ++ self.kv_event_publisher = EventPublisherFactory.create( self.kv_events_config, -@@ -207,9 +220,15 @@ class Scheduler(SchedulerInterface): + self.parallel_config.data_parallel_rank, +@@ -133,6 +146,7 @@ class Scheduler(SchedulerInterface): + + # KV Connector: requests in process of async KV loading or recving + self.finished_recving_kv_req_ids: set[str] = set() ++ self.failed_recving_kv_req_ids: set[str] = set() + + # Encoder-related. + # Calculate encoder cache size if applicable +@@ -207,9 +221,15 @@ class Scheduler(SchedulerInterface): # First, schedule the RUNNING requests. req_index = 0 @@ -546,7 +649,7 @@ index 2b2cd63c2..e42ed67b1 100644 num_new_tokens = (request.num_tokens_with_spec + request.num_output_placeholders - request.num_computed_tokens) -@@ -255,7 +274,8 @@ class Scheduler(SchedulerInterface): +@@ -255,7 +275,8 @@ class Scheduler(SchedulerInterface): new_blocks = self.kv_cache_manager.allocate_slots( request, num_new_tokens, @@ -556,7 +659,7 @@ index 2b2cd63c2..e42ed67b1 100644 if new_blocks is None: # The request cannot be scheduled. # Preempt the lowest-priority request. -@@ -339,6 +359,11 @@ class Scheduler(SchedulerInterface): +@@ -339,6 +360,11 @@ class Scheduler(SchedulerInterface): request = self.waiting.peek_request() @@ -568,7 +671,7 @@ index 2b2cd63c2..e42ed67b1 100644 # KVTransfer: skip request if still waiting for remote kvs. if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS: is_ready = self._update_waiting_for_remote_kv(request) -@@ -476,6 +501,7 @@ class Scheduler(SchedulerInterface): +@@ -476,6 +502,7 @@ class Scheduler(SchedulerInterface): num_lookahead_tokens=effective_lookahead_tokens, delay_cache_blocks=load_kv_async, num_encoder_tokens=num_encoder_tokens, @@ -576,7 +679,7 @@ index 2b2cd63c2..e42ed67b1 100644 ) if new_blocks is None: -@@ -587,6 +613,7 @@ class Scheduler(SchedulerInterface): +@@ -587,6 +614,7 @@ class Scheduler(SchedulerInterface): scheduled_spec_decode_tokens=scheduled_spec_decode_tokens, scheduled_encoder_inputs=scheduled_encoder_inputs, num_common_prefix_blocks=num_common_prefix_blocks, @@ -584,7 +687,69 @@ index 2b2cd63c2..e42ed67b1 100644 # finished_req_ids is an existing state in the scheduler, # instead of being newly scheduled in this step. # It contains the request IDs that are finished in between -@@ -1097,6 +1124,10 @@ class Scheduler(SchedulerInterface): +@@ -669,6 +697,7 @@ class Scheduler(SchedulerInterface): + new_token_ids: list[list[int]] = [] + new_block_ids: list[Optional[tuple[list[int], ...]]] = [] + num_computed_tokens: list[int] = [] ++ num_output_tokens: list[int] = [] + + use_connector = self.connector is not None + for req in itertools.chain(running_reqs, resumed_reqs): +@@ -693,6 +722,7 @@ class Scheduler(SchedulerInterface): + new_block_ids.append( + req_to_new_blocks[req_id].get_block_ids(allow_none=True)) + num_computed_tokens.append(req.num_computed_tokens) ++ num_output_tokens.append(len(req.output_token_ids)) + # Because resumed_reqs is usually empty, it is more efficient to do + # in-place appending so that we don't need to allocate a new list. + resumed_from_preemption = [False] * len(running_reqs) +@@ -704,6 +734,7 @@ class Scheduler(SchedulerInterface): + new_token_ids=new_token_ids, + new_block_ids=new_block_ids, + num_computed_tokens=num_computed_tokens, ++ num_output_tokens=num_output_tokens, + ) + + def _try_schedule_encoder_inputs( +@@ -875,6 +906,15 @@ class Scheduler(SchedulerInterface): + spec_decoding_stats: Optional[SpecDecodingStats] = None + kv_connector_stats = (kv_connector_output.kv_connector_stats + if kv_connector_output else None) ++ ++ failed_kv_load_req_ids = None ++ if kv_connector_output and kv_connector_output.invalid_block_ids: ++ # These blocks contain externally computed tokens that failed to ++ # load. Identify affected requests and adjust their computed token ++ # count to trigger recomputation of the invalid blocks. ++ failed_kv_load_req_ids = self._handle_invalid_blocks( ++ kv_connector_output.invalid_block_ids ++ ) + + # NOTE(woosuk): As len(num_scheduled_tokens) can be up to 1K or more, + # the below loop can be a performance bottleneck. We should do our best +@@ -883,6 +923,9 @@ class Scheduler(SchedulerInterface): + stopped_preempted_reqs: set[Request] = set() + for req_id, num_tokens_scheduled in num_scheduled_tokens.items(): + assert num_tokens_scheduled > 0 ++ if failed_kv_load_req_ids and req_id in failed_kv_load_req_ids: ++ # Skip requests that were recovered from KV load failure ++ continue + request = self.requests.get(req_id) + if request is None: + # The request is already finished. This can happen if the +@@ -905,7 +948,10 @@ class Scheduler(SchedulerInterface): + # tokens and rejections. If some tokens are rejected, + # num_computed_tokens is decreased by the number of rejected + # tokens. +- request.num_computed_tokens -= num_rejected ++ if request.num_computed_tokens > 0: ++ request.num_computed_tokens -= num_rejected ++ if request.num_output_placeholders > 0: ++ request.num_output_placeholders -= num_rejected + spec_decoding_stats = self.make_spec_decoding_stats( + spec_decoding_stats, + num_draft_tokens=num_draft_tokens, +@@ -1097,6 +1143,10 @@ class Scheduler(SchedulerInterface): def add_request(self, request: Request) -> None: self.waiting.add_request(request) self.requests[request.request_id] = request @@ -595,7 +760,7 @@ index 2b2cd63c2..e42ed67b1 100644 if self.log_stats: request.record_event(EngineCoreEventType.QUEUED) -@@ -1147,6 +1178,9 @@ class Scheduler(SchedulerInterface): +@@ -1147,6 +1197,9 @@ class Scheduler(SchedulerInterface): def _free_request(self, request: Request) -> Optional[dict[str, Any]]: assert request.is_finished() @@ -605,6 +770,232 @@ index 2b2cd63c2..e42ed67b1 100644 delay_free_blocks, kv_xfer_params = self._connector_finished(request) self.encoder_cache_manager.free(request) request_id = request.request_id +@@ -1250,18 +1303,31 @@ class Scheduler(SchedulerInterface): + if request.request_id not in self.finished_recving_kv_req_ids: + return False + +- # Now that the blocks are ready, actually cache them. +- (block_ids, ) = self.kv_cache_manager.get_block_ids(request.request_id) +- num_computed_tokens = len(block_ids) * self.block_size +- # Handle the case where num request tokens less than one block. +- num_computed_tokens = min(num_computed_tokens, request.num_tokens) +- if num_computed_tokens == request.num_tokens: +- num_computed_tokens -= 1 +- # This will cache the blocks iff caching is enabled. +- self.kv_cache_manager.cache_blocks(request, num_computed_tokens) ++ if request.request_id in self.failed_recving_kv_req_ids: ++ # Request had KV load failures; num_computed_tokens was already ++ # updated in _update_requests_with_invalid_blocks ++ if request.num_computed_tokens: ++ # Cache any valid computed tokens. ++ self.kv_cache_manager.cache_blocks(request, request.num_computed_tokens) ++ else: ++ # No valid computed tokens, release allocated blocks. ++ # There may be a local cache hit on retry. ++ self.kv_cache_manager.free(request) + +- # Update the request state for scheduling. +- request.num_computed_tokens = num_computed_tokens ++ self.failed_recving_kv_req_ids.remove(request.request_id) ++ else: ++ # Now that the blocks are ready, actually cache them. ++ (block_ids,) = self.kv_cache_manager.get_block_ids(request.request_id) ++ num_computed_tokens = len(block_ids) * self.block_size ++ # Handle the case where num request tokens less than one block. ++ num_computed_tokens = min(num_computed_tokens, request.num_tokens) ++ if num_computed_tokens == request.num_tokens: ++ num_computed_tokens -= 1 ++ # This will cache the blocks iff caching is enabled. ++ self.kv_cache_manager.cache_blocks(request, num_computed_tokens) ++ ++ # Update the request state for scheduling. ++ request.num_computed_tokens = num_computed_tokens + + # Return that we are ready. + self.finished_recving_kv_req_ids.remove(request.request_id) +@@ -1294,3 +1360,142 @@ class Scheduler(SchedulerInterface): + "but the request is already freed.", req_id) + else: + self._free_blocks(self.requests[req_id]) ++ ++ def _update_requests_with_invalid_blocks( ++ self, requests: Iterable[Request], invalid_block_ids: set[int] ++ ) -> tuple[set[str], int]: ++ """ ++ Identify and update requests affected by invalid KV cache blocks. ++ ++ This method scans the given requests, detects those with invalid blocks ++ and adjusts their `num_computed_tokens` to the longest valid prefix. ++ For observability, it also accumulates the total number of tokens that ++ will need to be recomputed across all affected requests. ++ ++ Args: ++ requests: The set of requests to scan for invalid blocks. ++ invalid_block_ids: IDs of invalid blocks. ++ ++ Returns: ++ tuple: ++ - affected_req_ids (set[str]): IDs of requests impacted by ++ invalid blocks. ++ - total_affected_tokens (int): Total number of tokens that must ++ be recomputed across all affected requests (for observability). ++ """ ++ affected_req_ids: set[str] = set() ++ total_affected_tokens = 0 ++ # If a block is invalid and shared by multiple requests in the batch, ++ # these requests must be rescheduled, but only the first will recompute ++ # it. This set tracks blocks already marked for recomputation. ++ marked_invalid_block_ids: set[int] = set() ++ for request in requests: ++ is_affected = False ++ marked_invalid_block = False ++ req_id = request.request_id ++ # TODO (davidb): add support for hybrid memory allocator ++ (req_block_ids,) = self.kv_cache_manager.get_block_ids(req_id) ++ # We iterate only over blocks that may contain externally computed ++ # tokens ++ if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS: ++ # Async loading. If num_computed_tokens is set it implies we ++ # already processed some block failures for it in a prior step ++ req_num_computed_tokens = ( ++ request.num_computed_tokens ++ if req_id in self.failed_recving_kv_req_ids ++ else len(req_block_ids) * self.block_size ++ ) ++ else: ++ # Sync loading. num_computed_tokens includes new tokens ++ req_num_computed_tokens = request.num_cached_tokens ++ ++ req_num_computed_blocks = ( ++ req_num_computed_tokens + self.block_size - 1 ++ ) // self.block_size ++ for idx, block_id in zip(range(req_num_computed_blocks), req_block_ids): ++ if block_id not in invalid_block_ids: ++ continue ++ ++ is_affected = True ++ ++ if block_id in marked_invalid_block_ids: ++ # This invalid block is shared with a previous request ++ # and was already marked for recomputation. ++ # This means this request can still consider this block ++ # as computed when rescheduled. ++ # Currently this only applies to sync loading; Async ++ # loading does not yet support block sharing ++ continue ++ ++ marked_invalid_block_ids.add(block_id) ++ ++ if marked_invalid_block: ++ # This request has already marked an invalid block for ++ # recomputation and updated its num_computed_tokens. ++ continue ++ ++ marked_invalid_block = True ++ # Truncate the computed tokens at the first failed block ++ request.num_computed_tokens = idx * self.block_size ++ total_affected_tokens += ( ++ req_num_computed_tokens - request.num_computed_tokens ++ ) ++ ++ if is_affected: ++ if not marked_invalid_block: ++ # All invalid blocks of this request are shared with ++ # previous requests and will be recomputed by them. ++ # Revert to considering only cached tokens as computed. ++ # Currently this only applies to sync loading; Async ++ # loading does not yet support block sharing ++ total_affected_tokens += ( ++ request.num_computed_tokens - request.num_cached_tokens ++ ) ++ request.num_computed_tokens = request.num_cached_tokens ++ ++ affected_req_ids.add(request.request_id) ++ ++ return affected_req_ids, total_affected_tokens ++ ++ def _handle_invalid_blocks(self, invalid_block_ids: set[int]) -> set[str]: ++ total_requests_to_reschedule = 0 ++ total_tokens_to_reschedule = 0 ++ ++ # --- Handle async KV loads (WAITING_FOR_REMOTE_KVS) --- ++ async_load_reqs = ( ++ req ++ for req in self.waiting ++ if req.status == RequestStatus.WAITING_FOR_REMOTE_KVS ++ ) ++ async_affected_req_ids, num_tokens_to_reschedule = ( ++ self._update_requests_with_invalid_blocks( ++ async_load_reqs, invalid_block_ids ++ ) ++ ) ++ ++ total_requests_to_reschedule += len(async_affected_req_ids) ++ total_tokens_to_reschedule += num_tokens_to_reschedule ++ ++ # Mark requests with async KV load failures; they will be rescheduled ++ # once loading completes. ++ self.failed_recving_kv_req_ids |= async_affected_req_ids ++ ++ # --- Handle sync KV loads (running requests) --- ++ sync_affected_req_ids, num_tokens_to_reschedule = ( ++ self._update_requests_with_invalid_blocks(self.running, invalid_block_ids) ++ ) ++ ++ total_requests_to_reschedule += len(sync_affected_req_ids) ++ total_tokens_to_reschedule += num_tokens_to_reschedule ++ ++ if total_requests_to_reschedule: ++ logger.warning( ++ "Recovered from KV load failure: " ++ "%d request(s) rescheduled (%d tokens affected).", ++ total_requests_to_reschedule, ++ total_tokens_to_reschedule, ++ ) ++ ++ # Return the IDs of affected running requests to skip in ++ # update_from_output. ++ return sync_affected_req_ids +diff --git a/vllm/v1/core/single_type_kv_cache_manager.py b/vllm/v1/core/single_type_kv_cache_manager.py +index e889f7804..4ecd9c815 100644 +--- a/vllm/v1/core/single_type_kv_cache_manager.py ++++ b/vllm/v1/core/single_type_kv_cache_manager.py +@@ -142,6 +142,9 @@ class SingleTypeKVCacheManager(ABC): + num_cached_blocks = self.num_cached_block[request.request_id] + num_full_blocks = num_tokens // self.block_size + ++ if num_cached_blocks >= num_full_blocks: ++ return ++ + self.block_pool.cache_full_blocks( + request=request, + blocks=self.req_to_blocks[request.request_id], +diff --git a/vllm/v1/outputs.py b/vllm/v1/outputs.py +index 01f3676ab..efc16b8b6 100644 +--- a/vllm/v1/outputs.py ++++ b/vllm/v1/outputs.py +@@ -2,7 +2,7 @@ + # SPDX-FileCopyrightText: Copyright contributors to the vLLM project + + from abc import ABC, abstractmethod +-from dataclasses import dataclass ++from dataclasses import dataclass, field + from typing import TYPE_CHECKING, NamedTuple, Optional, Union + + import torch +@@ -87,10 +87,11 @@ class KVConnectorOutput: + finished_sending: Optional[set[str]] = None + finished_recving: Optional[set[str]] = None + kv_connector_stats: Optional["KVConnectorStats"] = None ++ invalid_block_ids: set[int] = field(default_factory=set) + + def is_empty(self): + return (not self.finished_sending and not self.finished_recving +- and not self.kv_connector_stats) ++ and not self.kv_connector_stats and not self.invalid_block_ids) + + + # ModelRunnerOutput is serialized and sent to the scheduler process. diff --git a/vllm/v1/worker/block_table.py b/vllm/v1/worker/block_table.py index 82b6d1b51..85cdcb7ac 100644 --- a/vllm/v1/worker/block_table.py @@ -637,7 +1028,7 @@ index 82b6d1b51..85cdcb7ac 100644 for i, block_table in enumerate(self.block_tables): block_table.add_row(block_ids[i], row_idx) diff --git a/vllm/v1/worker/gpu_model_runner.py b/vllm/v1/worker/gpu_model_runner.py -index a438c7777..28f619c8f 100644 +index a438c7777..00a50d43e 100644 --- a/vllm/v1/worker/gpu_model_runner.py +++ b/vllm/v1/worker/gpu_model_runner.py @@ -1,6 +1,7 @@ @@ -666,7 +1057,7 @@ index a438c7777..28f619c8f 100644 self.requests.pop(req_id, None) # Remove the finished requests from the persistent batch. # NOTE(woosuk): There could be an edge case where finished_req_ids and -@@ -611,11 +616,13 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): +@@ -611,11 +616,15 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): # Update the states of the running/resumed requests. is_last_rank = get_pp_group().is_last_rank req_data = scheduler_output.scheduled_cached_reqs @@ -676,12 +1067,27 @@ index a438c7777..28f619c8f 100644 num_computed_tokens = req_data.num_computed_tokens[i] new_block_ids = req_data.new_block_ids[i] resumed_from_preemption = req_data.resumed_from_preemption[i] ++ num_output_tokens = req_data.num_output_tokens[i] ++ req_index = self.input_batch.req_id_to_index.get(req_id) + is_sparsed_request = req_sparsed_slots[req_id] != INVALID_SLOT # Update the cached states. req_state.num_computed_tokens = num_computed_tokens -@@ -637,17 +644,16 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): +@@ -635,21 +644,30 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): + elif num_new_tokens > 0: + req_state.output_token_ids.extend( new_token_ids[-num_new_tokens:]) ++ elif num_output_tokens < len(req_state.output_token_ids): ++ # Some output tokens were discarded due to a sync-KV-load ++ # failure. Align the cached state. ++ del req_state.output_token_ids[num_output_tokens:] ++ if req_index is not None: ++ end_idx = ( ++ self.input_batch.num_prompt_tokens[req_index] ++ + num_output_tokens ++ ) ++ self.input_batch.num_tokens[req_index] = end_idx ++ self.input_batch.num_tokens_no_spec[req_index] = end_idx # Update the block IDs. - if not resumed_from_preemption: @@ -701,9 +1107,11 @@ index a438c7777..28f619c8f 100644 - # Replace the existing block IDs with the new ones. - req_state.block_ids = new_block_ids - req_index = self.input_batch.req_id_to_index.get(req_id) +- req_index = self.input_batch.req_id_to_index.get(req_id) if req_index is None: -@@ -660,6 +666,10 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): + # The request is not in the persistent batch. + # The request was either preempted and resumed later, or was not +@@ -660,6 +678,10 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): # Update the persistent batch. self.input_batch.num_computed_tokens_cpu[req_index] = ( num_computed_tokens) @@ -714,7 +1122,7 @@ index a438c7777..28f619c8f 100644 if new_block_ids is not None: self.input_batch.block_table.append_row( new_block_ids, req_index) -@@ -968,6 +978,20 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): +@@ -968,6 +990,20 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): if self.uses_mrope: self._calc_mrope_positions(scheduler_output) @@ -735,7 +1143,7 @@ index a438c7777..28f619c8f 100644 # Get token indices. # E.g., [0, 1, 0, 1, 2, 3, 4, 0, 1, 2] # -> [0, 1, M, M + 1, M + 2, M + 3, M + 4, 2 * M, 2 * M + 1, 2 * M + 2] -@@ -1031,7 +1055,7 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): +@@ -1031,7 +1067,7 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): output_idx += num_sched self.input_batch.block_table.compute_slot_mapping( @@ -744,7 +1152,7 @@ index a438c7777..28f619c8f 100644 self.input_batch.block_table.commit_slot_mapping( total_num_scheduled_tokens) -@@ -1057,9 +1081,14 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): +@@ -1057,9 +1093,14 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): uniform_decode=uniform_decode, vllm_config=self.vllm_config) @@ -762,7 +1170,7 @@ index a438c7777..28f619c8f 100644 # Fill unused with 0 for full cuda graph mode. self.seq_lens.np[num_reqs:].fill(0) self.seq_lens.copy_to_gpu() -@@ -1073,7 +1102,7 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): +@@ -1073,7 +1114,7 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): # Record the index of requests that should not be sampled, # so that we could clear the sampled tokens before returning @@ -771,7 +1179,7 @@ index a438c7777..28f619c8f 100644 discard_request_indices = np.nonzero(discard_requests_mask)[0] self.num_discarded_requests = len(discard_request_indices) self.discard_request_indices.np[:self.num_discarded_requests] = ( -@@ -1091,6 +1120,8 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): +@@ -1091,6 +1132,8 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): non_blocking=True) else: # Common case (1D positions) @@ -780,7 +1188,7 @@ index a438c7777..28f619c8f 100644 self.positions.copy_to_gpu(total_num_scheduled_tokens) use_spec_decode = len( -@@ -2295,6 +2326,9 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): +@@ -2295,6 +2338,9 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): ), record_function_or_nullcontext("Forward"), self.maybe_get_kv_connector_output(scheduler_output) as kv_connector_output): @@ -790,7 +1198,7 @@ index a438c7777..28f619c8f 100644 model_output = self.model( input_ids=input_ids, positions=positions, -@@ -2303,6 +2337,8 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): +@@ -2303,6 +2349,8 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): **model_kwargs, ) @@ -799,7 +1207,7 @@ index a438c7777..28f619c8f 100644 with record_function_or_nullcontext("Postprocess"): if self.use_aux_hidden_state_outputs: # True when EAGLE 3 is used. -@@ -2584,6 +2620,30 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): +@@ -2584,6 +2632,30 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): ) return draft_token_ids @@ -830,7 +1238,7 @@ index a438c7777..28f619c8f 100644 def update_config(self, overrides: dict[str, Any]) -> None: allowed_config_names = {"load_config", "model_config"} for config_name, config_overrides in overrides.items(): -@@ -3928,6 +3988,11 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): +@@ -3928,6 +4000,11 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin): kv_caches = self._reshape_kv_cache_tensors(kv_cache_config, kv_cache_raw_tensors) @@ -843,7 +1251,7 @@ index a438c7777..28f619c8f 100644 for layer_name, target_layer_name in self.shared_kv_cache_layers.items( ): diff --git a/vllm/v1/worker/gpu_worker.py b/vllm/v1/worker/gpu_worker.py -index 8c75e8914..c3e9e781f 100644 +index 8c75e8914..60eb0a83f 100644 --- a/vllm/v1/worker/gpu_worker.py +++ b/vllm/v1/worker/gpu_worker.py @@ -35,6 +35,8 @@ from vllm.v1.worker.gpu_model_runner import GPUModelRunner @@ -855,11 +1263,33 @@ index 8c75e8914..c3e9e781f 100644 logger = init_logger(__name__) if TYPE_CHECKING: -@@ -708,3 +710,4 @@ def init_worker_distributed_environment( +@@ -464,8 +466,7 @@ class Worker(WorkerBase): + + # In case of PP with kv transfer, we need to pass through the + # kv_connector_output +- if (not kv_connector_output.finished_sending +- and not kv_connector_output.finished_recving): ++ if kv_connector_output.is_empty(): + return EMPTY_MODEL_RUNNER_OUTPUT + + output = copy.copy(EMPTY_MODEL_RUNNER_OUTPUT) +@@ -708,3 +709,4 @@ def init_worker_distributed_environment( parallel_config.decode_context_parallel_size) ensure_kv_transfer_initialized(vllm_config) + ensure_ucm_sparse_initialized(vllm_config) +diff --git a/vllm/v1/worker/kv_connector_model_runner_mixin.py b/vllm/v1/worker/kv_connector_model_runner_mixin.py +index 7eaff924e..956bb0145 100644 +--- a/vllm/v1/worker/kv_connector_model_runner_mixin.py ++++ b/vllm/v1/worker/kv_connector_model_runner_mixin.py +@@ -120,6 +120,7 @@ class KVConnectorModelRunnerMixin: + + output.finished_sending, output.finished_recving = ( + kv_connector.get_finished(scheduler_output.finished_req_ids)) ++ output.invalid_block_ids = kv_connector.get_block_ids_with_load_errors() + + output.kv_connector_stats = KVConnectorModelRunnerMixin.\ + get_kv_connector_stats() -- 2.34.1