Skip to content
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
7716866
support qwen3-embedding
lizexu123 Sep 22, 2025
5fde033
merge develop
lizexu123 Oct 10, 2025
001f23d
Merge branch 'develop' of https://github.com/PaddlePaddle/FastDeploy …
lizexu123 Oct 10, 2025
73141d4
Merge branch 'develop' of https://github.com/PaddlePaddle/FastDeploy …
lizexu123 Oct 24, 2025
6a2ddaf
Merge branch 'develop' of https://github.com/PaddlePaddle/FastDeploy …
lizexu123 Oct 24, 2025
85d14ba
support qwen3-embedding-0.6b
lizexu123 Oct 24, 2025
8200040
fix
lizexu123 Oct 24, 2025
5832cc4
update
lizexu123 Oct 27, 2025
58616e4
fix bug
lizexu123 Oct 27, 2025
ad2f7b6
fix test_return_token_ids.py and update enable_thinking
lizexu123 Oct 27, 2025
aeddcac
Merge branch 'develop' of https://github.com/PaddlePaddle/FastDeploy …
lizexu123 Oct 27, 2025
955fac1
fix mtp dummy_run
lizexu123 Oct 27, 2025
21c20a7
Merge branch 'develop' of https://github.com/PaddlePaddle/FastDeploy …
lizexu123 Oct 28, 2025
0206d42
merge develop
lizexu123 Oct 28, 2025
30795d2
fix np.float32
lizexu123 Oct 28, 2025
6bc1ed2
delete FD_DISABLE_CHUNKED_PREFILL and FD_USE_GET_SAVE_OUTPUT_V1
lizexu123 Oct 28, 2025
f439ca2
delete and build_stream_transfer_data
lizexu123 Oct 28, 2025
27d686b
fix test_update_v1:
lizexu123 Oct 28, 2025
a6a9483
update develop
lizexu123 Oct 28, 2025
15a0df8
Merge branch 'develop' of https://github.com/PaddlePaddle/FastDeploy …
lizexu123 Oct 29, 2025
57e76be
fix
lizexu123 Oct 29, 2025
eae6db6
fix
lizexu123 Oct 29, 2025
90d5ee1
update dummy_run post_process
lizexu123 Oct 29, 2025
1a35691
delete test_update_v1
lizexu123 Oct 29, 2025
2fa8733
fix
lizexu123 Oct 29, 2025
5b12f6f
fix dummy_run
lizexu123 Oct 29, 2025
7ca73ba
fix model_path
lizexu123 Oct 30, 2025
1e3cae5
fix model_path
lizexu123 Oct 30, 2025
90ef114
fix dummy_run
lizexu123 Oct 30, 2025
6c20954
Merge branch 'develop' of https://github.com/PaddlePaddle/FastDeploy …
lizexu123 Oct 30, 2025
c8c3664
merge develop
lizexu123 Oct 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions fastdeploy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ def _post_init(self):

self.enable_mm = is_multimodal_model

if self.runner_type == "pooling":
os.environ["FD_USE_GET_SAVE_OUTPUT_V1"] = "1"

if self.runner_type == "generate" and not is_generative_model:
if is_multimodal_model:
pass
Expand Down
1 change: 1 addition & 0 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,7 @@ def _fetch_request():
raise
# 2. Schedule requests
tasks = self.resource_manager.schedule()

# 3. Send to engine
if tasks:
if self.cfg.scheduler_config.splitwise_role == "decode":
Expand Down
24 changes: 7 additions & 17 deletions fastdeploy/model_executor/layers/pooler.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,19 +305,6 @@ def forward(self, pooled_data: Union[list[paddle.Tensor], paddle.Tensor], poolin
return pooled_data


def build_output(
all_data: Union[paddle.Tensor, list[paddle.Tensor]],
) -> PoolerOutput:
# Pooling models D2H & synchronize occurs here
if isinstance(all_data, list):
all_data = [d.cpu() for d in all_data]
else:
all_data = all_data.cpu()

all_outputs = [PoolingSequenceGroupOutput(data) for data in all_data]
return PoolerOutput(outputs=all_outputs)


class PoolingMethod(nn.Layer, ABC):

@staticmethod
Expand Down Expand Up @@ -473,8 +460,11 @@ def forward(
pooling_metadata: PoolingMetadata,
) -> PoolerOutput:
pooled_data = self.extract_states(hidden_states, pooling_metadata)
pooled_data = self.head(pooled_data, pooling_metadata)
return build_output(pooled_data)
pooling_params = get_pooling_params(pooling_metadata)
assert len(pooled_data) == len(pooling_params)

pooled_data = [self.head(d, p) for d, p in zip(pooled_data, pooling_params)]
return pooled_data


class SimplePooler(Pooler):
Expand Down Expand Up @@ -520,7 +510,7 @@ def forward(
) -> PoolerOutput:
pooled_data = self.pooling(hidden_states, pooling_metadata)
pooled_data = self.head(pooled_data, pooling_metadata)
return build_output(pooled_data)
return pooled_data


class PoolerNormalize(PoolerActivation):
Expand Down Expand Up @@ -567,7 +557,7 @@ def forward(
hidden_states,
pooling_metadata[offset : offset + num_items],
)
outputs.extend(group_output.outputs)
outputs.extend(group_output)
offset += num_items

return PoolerOutput(outputs)
133 changes: 108 additions & 25 deletions fastdeploy/model_executor/pre_and_post_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"""

import queue
from typing import Dict, Optional
from typing import Dict, List, Optional, Union

import numpy as np
import paddle
Expand Down Expand Up @@ -85,6 +85,7 @@
speculate_limit_thinking_content_length_v2,
)

from fastdeploy.output.pooler import PoolerOutput, PoolingSequenceGroupOutput
from fastdeploy.output.stream_transfer_data import DecoderState, StreamTransferData
from fastdeploy.worker.output import ModelOutputData, ModelRunnerOutput, SamplerOutput

Expand Down Expand Up @@ -238,17 +239,31 @@ def pre_process(
)


def _build_stream_transfer_data(output_tokens: np.ndarray):
def _build_stream_transfer_data(output_tokens: np.ndarray, pooler_outputs: List[PoolingSequenceGroupOutput] = None):
"""Split output_tokens and output"""
output_tokens = output_tokens.reshape([-1]).numpy()
output_tokens_lists = np.split(output_tokens, output_tokens.shape[0])

stream_transfer_datas = []
for bid, output_token_per_sample in enumerate(output_tokens_lists):
stream_transfer_data = StreamTransferData(
decoder_state=DecoderState.TEXT, tokens=output_token_per_sample, batch_id=bid
)
stream_transfer_datas.append(stream_transfer_data)
if output_tokens is not None:

output_tokens = output_tokens.reshape([-1]).numpy()
output_tokens_lists = np.split(output_tokens, output_tokens.shape[0])

for bid, output_token_per_sample in enumerate(output_tokens_lists):
stream_transfer_data = StreamTransferData(
decoder_state=DecoderState.TEXT, tokens=output_token_per_sample, batch_id=bid
)
stream_transfer_datas.append(stream_transfer_data)
elif pooler_outputs is not None:
for bid, pooler_output in enumerate(pooler_outputs):
if pooler_output.dtype == paddle.bfloat16:
pooler_output = pooler_output.astype("float32")

pooler_output = pooler_output.numpy()

stream_transfer_data = StreamTransferData(
decoder_state=DecoderState.TEXT, pooler_output=pooler_output, batch_id=bid
)
stream_transfer_datas.append(stream_transfer_data)
return stream_transfer_datas


Expand Down Expand Up @@ -458,7 +473,7 @@ def post_process_specualate(


def post_process(
sampler_output: SamplerOutput,
sampler_or_pooler_output: Union[SamplerOutput, PoolerOutput],
model_output: ModelOutputData,
share_inputs: Dict[str, paddle.Tensor],
block_size: int = 64,
Expand All @@ -470,28 +485,40 @@ def post_process(
line_break_id: int = -1,
) -> None:
"""Post-processing steps after completing a single token generation."""
if speculative_decoding:
post_process_specualate(
sampler_output,
model_output,
share_inputs,
save_each_rank,
skip_save_output,
think_end_id,
line_break_id,
)
else:
post_process_normal(
sampler_output,

if isinstance(sampler_or_pooler_output, PoolerOutput):
post_process_pooling(
sampler_or_pooler_output,
model_output,
share_inputs,
block_size,
save_each_rank,
skip_save_output,
async_output_queue,
think_end_id,
line_break_id,
)
else:
if speculative_decoding:
post_process_specualate(
sampler_or_pooler_output,
model_output,
share_inputs,
save_each_rank,
skip_save_output,
think_end_id,
line_break_id,
)
else:
post_process_normal(
sampler_or_pooler_output,
model_output,
share_inputs,
block_size,
save_each_rank,
skip_save_output,
async_output_queue,
think_end_id,
line_break_id,
)


def step_cuda(
Expand Down Expand Up @@ -775,3 +802,59 @@ def rebuild_padding(
else:
raise RuntimeError("Not supported platform")
return hidden_states


def post_process_pooling(
pooler_output: PoolerOutput,
model_output: ModelOutputData,
share_inputs: Dict[str, paddle.Tensor],
block_size: int = 64,
save_each_rank: bool = False,
skip_save_output: bool = False,
async_output_queue: queue.Queue = None,
) -> None:

paddle.assign(
paddle.where(
model_output.stop_flags,
model_output.step_idx,
model_output.step_idx + 1,
),
model_output.step_idx,
)
length_cond = paddle.greater_equal(model_output.step_idx, model_output.max_dec_len)

paddle.assign(
paddle.logical_or(model_output.stop_flags, length_cond),
model_output.stop_flags,
)

with paddle.framework._no_check_dy2st_diff():
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
dummy_sampled_tokens = paddle.full_like(model_output.next_tokens, -1, dtype="int64")

paddle.assign(
paddle.ones_like(model_output.stop_flags, dtype="bool"),
model_output.stop_flags,
)
update_inputs_v1(
model_output.stop_flags,
model_output.not_need_stop,
model_output.seq_lens_this_time,
model_output.seq_lens_encoder,
model_output.seq_lens_decoder,
share_inputs["step_seq_lens_decoder"],
share_inputs["prompt_lens"],
dummy_sampled_tokens,
model_output.input_ids,
share_inputs["block_tables"],
model_output.stop_nums,
model_output.next_tokens,
model_output.is_block_step,
block_size,
)

if not skip_save_output:
if save_each_rank or model_output.mp_rank == 0:
output = _build_stream_transfer_data(output_tokens=None, pooler_outputs=pooler_output.outputs)
async_output_queue.put(output)
2 changes: 1 addition & 1 deletion fastdeploy/output/stream_transfer_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ class StreamTransferData:
"""StreamTransferData"""

decoder_state: DecoderState
tokens: np.array
batch_id: int
tokens: Optional[np.array] = None
speculaive_decoding: bool = False
logprobs: Optional[np.array] = None
accept_tokens: Optional[np.array] = None
Expand Down
Loading
Loading