Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
7716866
support qwen3-embedding
lizexu123 Sep 22, 2025
5fde033
merge develop
lizexu123 Oct 10, 2025
001f23d
Merge branch 'develop' of https://github.com/PaddlePaddle/FastDeploy …
lizexu123 Oct 10, 2025
73141d4
Merge branch 'develop' of https://github.com/PaddlePaddle/FastDeploy …
lizexu123 Oct 24, 2025
6a2ddaf
Merge branch 'develop' of https://github.com/PaddlePaddle/FastDeploy …
lizexu123 Oct 24, 2025
85d14ba
support qwen3-embedding-0.6b
lizexu123 Oct 24, 2025
8200040
fix
lizexu123 Oct 24, 2025
5832cc4
update
lizexu123 Oct 27, 2025
58616e4
fix bug
lizexu123 Oct 27, 2025
ad2f7b6
fix test_return_token_ids.py and update enable_thinking
lizexu123 Oct 27, 2025
aeddcac
Merge branch 'develop' of https://github.com/PaddlePaddle/FastDeploy …
lizexu123 Oct 27, 2025
955fac1
fix mtp dummy_run
lizexu123 Oct 27, 2025
21c20a7
Merge branch 'develop' of https://github.com/PaddlePaddle/FastDeploy …
lizexu123 Oct 28, 2025
0206d42
merge develop
lizexu123 Oct 28, 2025
30795d2
fix np.float32
lizexu123 Oct 28, 2025
6bc1ed2
delete FD_DISABLE_CHUNKED_PREFILL and FD_USE_GET_SAVE_OUTPUT_V1
lizexu123 Oct 28, 2025
f439ca2
delete and build_stream_transfer_data
lizexu123 Oct 28, 2025
27d686b
fix test_update_v1:
lizexu123 Oct 28, 2025
a6a9483
update develop
lizexu123 Oct 28, 2025
15a0df8
Merge branch 'develop' of https://github.com/PaddlePaddle/FastDeploy …
lizexu123 Oct 29, 2025
57e76be
fix
lizexu123 Oct 29, 2025
eae6db6
fix
lizexu123 Oct 29, 2025
90d5ee1
update dummy_run post_process
lizexu123 Oct 29, 2025
1a35691
delete test_update_v1
lizexu123 Oct 29, 2025
2fa8733
fix
lizexu123 Oct 29, 2025
5b12f6f
fix dummy_run
lizexu123 Oct 29, 2025
7ca73ba
fix model_path
lizexu123 Oct 30, 2025
1e3cae5
fix model_path
lizexu123 Oct 30, 2025
90ef114
fix dummy_run
lizexu123 Oct 30, 2025
6c20954
Merge branch 'develop' of https://github.com/PaddlePaddle/FastDeploy …
lizexu123 Oct 30, 2025
c8c3664
merge develop
lizexu123 Oct 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion custom_ops/gpu_ops/cpp_extensions.cc
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,8 @@ void UpdateInputsV1(const paddle::Tensor& stop_flags,
const paddle::Tensor& stop_nums,
const paddle::Tensor& next_tokens,
const paddle::Tensor& is_block_step,
const int block_size);
const int block_size,
const bool is_pooling_task);

void RecoverDecodeTask(
const paddle::Tensor& stop_flags,
Expand Down
98 changes: 62 additions & 36 deletions custom_ops/gpu_ops/update_inputs_v1.cu
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ __global__ void update_inputs_kernel_v1(bool* not_need_stop,
const int input_ids_stride,
const int block_num_per_seq,
const int block_size,
bool prefill_one_step_stop) {
bool prefill_one_step_stop,
bool is_pooling_task) {
int thread_idx = threadIdx.x;
typedef cub::BlockReduce<int64_t, THREADBLOCK_SIZE> BlockReduce;
__shared__ typename BlockReduce::TempStorage temp_storage;
Expand All @@ -48,53 +49,75 @@ __global__ void update_inputs_kernel_v1(bool* not_need_stop,
stop_flag_now_int = 1;
}
}

if (thread_idx < bsz) {
if (stop_flag_now) {
seq_lens_this_time[thread_idx] = 0; // stop at next step
seq_lens_decoder[thread_idx] = 0;
seq_lens_encoder[thread_idx] = 0;
} else {
if (seq_lens_this_time[thread_idx] + seq_lens_decoder[thread_idx] >=
prompt_lens[thread_idx]) {
if (prefill_one_step_stop) {
// prefill done, stop
stop_flags[thread_idx] = true;
seq_lens_this_time[thread_idx] = 0;
seq_lens_decoder[thread_idx] = 0;
seq_lens_encoder[thread_idx] = 0;
stop_flag_now_int = 1;
} else {
// decoding
seq_lens_decoder[thread_idx] += seq_lens_this_time[thread_idx];
seq_lens_this_time[thread_idx] = 1;
seq_lens_encoder[thread_idx] = 0;
int64_t* input_ids_now = input_ids + thread_idx * input_ids_stride;
input_ids_now[0] = next_tokens[thread_idx];
if (is_pooling_task) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

对这个算子做了什么逻辑的改动?

Copy link
Collaborator Author

@lizexu123 lizexu123 Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pooling时,seq_lens_encode的全部shape的值都改成0,确保exist_prefill为0,解决hung的问题

if (seq_lens_this_time[thread_idx] > 0) {
int total_processed =
seq_lens_this_time[thread_idx] + seq_lens_decoder[thread_idx];

// to judge whether block is not enough
int* block_table_now = block_tables + thread_idx * block_num_per_seq;
if (seq_lens_this_time[thread_idx] != 0 &&
block_table_now[seq_lens_decoder[thread_idx] / block_size] ==
-1) {
// should be scheduled by server
is_block_step[thread_idx] = true;
seq_lens_this_time[thread_idx] = 0;
if (total_processed >= prompt_lens[thread_idx]) {
stop_flags[thread_idx] = true;
step_seq_lens_decoder[thread_idx] = seq_lens_decoder[thread_idx];
seq_lens_encoder[thread_idx] = 0;
seq_lens_decoder[thread_idx] = 0;
seq_lens_this_time[thread_idx] = 0;
stop_flag_now_int = 1;
}
} else {
seq_lens_encoder[thread_idx] = 0;
stop_flag_now_int = 1;
}
} else {
stop_flags[thread_idx] = true;
seq_lens_this_time[thread_idx] = 0;
seq_lens_decoder[thread_idx] = 0;
seq_lens_encoder[thread_idx] = 0;
topk_ids[thread_idx] = -1;
stop_flag_now_int = 1;
// Normal generation task logic
if (seq_lens_this_time[thread_idx] + seq_lens_decoder[thread_idx] >=
prompt_lens[thread_idx]) {
if (prefill_one_step_stop) {
// prefill done, stop
stop_flags[thread_idx] = true;
seq_lens_this_time[thread_idx] = 0;
seq_lens_decoder[thread_idx] = 0;
seq_lens_encoder[thread_idx] = 0;
stop_flag_now_int = 1;
} else {
// decoding
seq_lens_decoder[thread_idx] += seq_lens_this_time[thread_idx];
seq_lens_this_time[thread_idx] = 1;
seq_lens_encoder[thread_idx] = 0;
int64_t* input_ids_now = input_ids + thread_idx * input_ids_stride;
input_ids_now[0] = next_tokens[thread_idx];

// to judge whether block is not enough
int* block_table_now =
block_tables + thread_idx * block_num_per_seq;
if (seq_lens_this_time[thread_idx] != 0 &&
block_table_now[seq_lens_decoder[thread_idx] / block_size] ==
-1) {
// should be scheduled by server
is_block_step[thread_idx] = true;
seq_lens_this_time[thread_idx] = 0;
stop_flags[thread_idx] = true;
step_seq_lens_decoder[thread_idx] = seq_lens_decoder[thread_idx];
seq_lens_decoder[thread_idx] = 0;
stop_flag_now_int = 1;
}
}
} else {
stop_flags[thread_idx] = true;
seq_lens_this_time[thread_idx] = 0;
seq_lens_decoder[thread_idx] = 0;
seq_lens_encoder[thread_idx] = 0;
topk_ids[thread_idx] = -1;
stop_flag_now_int = 1;
}
}
}
}

__syncthreads();
int64_t stop_sum = BlockReduce(temp_storage).Sum(stop_flag_now_int);
if (thread_idx == 0) {
Expand All @@ -115,7 +138,8 @@ void UpdateInputsV1(const paddle::Tensor& stop_flags,
const paddle::Tensor& stop_nums,
const paddle::Tensor& next_tokens,
const paddle::Tensor& is_block_step,
const int block_size) {
const int block_size,
const bool is_pooling_task) {
#ifdef PADDLE_WITH_CUSTOM_DEVICE
auto dev_ctx = static_cast<const phi::CustomContext*>(
paddle::experimental::DeviceContextPool::Instance().Get(
Expand All @@ -132,6 +156,7 @@ void UpdateInputsV1(const paddle::Tensor& stop_flags,
}
const int max_bsz = stop_flags.shape()[0];
const int now_bsz = seq_lens_this_time.shape()[0];
const int bsz_to_process = is_pooling_task ? max_bsz : now_bsz;
const int input_ids_stride = input_ids.shape()[1];
const int block_num_per_seq = block_tables.shape()[1];
auto not_need_stop_gpu = not_need_stop.copy_to(stop_flags.place(), false);
Expand All @@ -149,12 +174,13 @@ void UpdateInputsV1(const paddle::Tensor& stop_flags,
const_cast<bool*>(stop_flags.data<bool>()),
const_cast<bool*>(is_block_step.data<bool>()),
next_tokens.data<int64_t>(),
now_bsz,
bsz_to_process,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里为什么需要改成max_bsz

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里需要max-num-seqs个seq_lens_encoder都变成0

max_bsz,
input_ids_stride,
block_num_per_seq,
block_size,
prefill_one_step_stop);
prefill_one_step_stop,
is_pooling_task);
auto not_need_stop_cpu =
not_need_stop_gpu.copy_to(not_need_stop.place(), false);
bool* not_need_stop_data = const_cast<bool*>(not_need_stop.data<bool>());
Expand All @@ -175,7 +201,7 @@ PD_BUILD_STATIC_OP(update_inputs_v1)
"stop_nums",
"next_tokens",
"is_block_step"})
.Attrs({"block_size: int"})
.Attrs({"block_size: int", "is_pooling_task: bool"})
.Outputs({"not_need_stop_out",
"seq_lens_this_time_out",
"seq_lens_encoder_out",
Expand Down
36 changes: 20 additions & 16 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,7 @@ def _fetch_request():
raise
# 2. Schedule requests
tasks = self.resource_manager.schedule()

# 3. Send to engine
if tasks:
if self.cfg.scheduler_config.splitwise_role == "decode":
Expand Down Expand Up @@ -886,24 +887,27 @@ def _zmq_send_generated_tokens(self):
for request_id, contents in results.items():
new_contents = []
for content in contents:
decode_type = content.outputs.decode_type
delta_text = ""
if decode_type == 0:
delta_text, token_ids = self._decode_token(
token_ids=content.outputs.token_ids, req_id=request_id, is_end=content.finished
)
if isinstance(content, RequestOutput):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else分支的类型是什么明确给出,然后再来个else报错

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else的现在只要不是生成式都走下面,目前我们只有这两个,后续还会有reward等,都是走else,这里孙磊参考改的

decode_type = content.outputs.decode_type
delta_text = ""
if decode_type == 0:
delta_text, token_ids = self._decode_token(
token_ids=content.outputs.token_ids, req_id=request_id, is_end=content.finished
)
else:
token_ids = content.outputs.token_ids
if len(token_ids):
content.outputs.token_ids = token_ids
content.outputs.text = delta_text
new_contents.append(content)
elif content.finished:
new_contents.append(content)
else:
llm_logger.warning(
f"current tokens need to accumulate, req_id: {request_id} {content.outputs.token_ids}"
)
else:
token_ids = content.outputs.token_ids
if len(token_ids):
content.outputs.token_ids = token_ids
content.outputs.text = delta_text
new_contents.append(content)
elif content.finished:
new_contents.append(content)
else:
llm_logger.warning(
f"current tokens need to accumulate, req_id: {request_id} {content.outputs.token_ids}"
)
if len(new_contents):
llm_logger.info(f"Send response for request id: {request_id}")
self.send_response_server.send_response(request_id, new_contents)
Expand Down
24 changes: 7 additions & 17 deletions fastdeploy/model_executor/layers/pooler.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,19 +305,6 @@ def forward(self, pooled_data: Union[list[paddle.Tensor], paddle.Tensor], poolin
return pooled_data


def build_output(
all_data: Union[paddle.Tensor, list[paddle.Tensor]],
) -> PoolerOutput:
# Pooling models D2H & synchronize occurs here
if isinstance(all_data, list):
all_data = [d.cpu() for d in all_data]
else:
all_data = all_data.cpu()

all_outputs = [PoolingSequenceGroupOutput(data) for data in all_data]
return PoolerOutput(outputs=all_outputs)


class PoolingMethod(nn.Layer, ABC):

@staticmethod
Expand Down Expand Up @@ -473,8 +460,11 @@ def forward(
pooling_metadata: PoolingMetadata,
) -> PoolerOutput:
pooled_data = self.extract_states(hidden_states, pooling_metadata)
pooled_data = self.head(pooled_data, pooling_metadata)
return build_output(pooled_data)
pooling_params = get_pooling_params(pooling_metadata)
assert len(pooled_data) == len(pooling_params)

pooled_data = [self.head(d, p) for d, p in zip(pooled_data, pooling_params)]
return pooled_data


class SimplePooler(Pooler):
Expand Down Expand Up @@ -520,7 +510,7 @@ def forward(
) -> PoolerOutput:
pooled_data = self.pooling(hidden_states, pooling_metadata)
pooled_data = self.head(pooled_data, pooling_metadata)
return build_output(pooled_data)
return pooled_data


class PoolerNormalize(PoolerActivation):
Expand Down Expand Up @@ -567,7 +557,7 @@ def forward(
hidden_states,
pooling_metadata[offset : offset + num_items],
)
outputs.extend(group_output.outputs)
outputs.extend(group_output)
offset += num_items

return PoolerOutput(outputs)
Loading
Loading