[Cherry-Pick][Speculative Decoding] Support mtp super ultra overlap in pd-split mode with insert_task overlap(#7323)#7441
Conversation
…de with insert_task overlap (PaddlePaddle#7323) * support mtp overlap in pd-split mode with insert_task overlap
|
Thanks for your contribution! |
b440150
into
PaddlePaddle:release/online/20260415
There was a problem hiding this comment.
Pull request overview
该 PR(cherry-pick 自 #7323)主要围绕 PD-split 模式下 speculative decoding(尤其 SUFFIX 分支)插入任务时的更深度 overlap 做了输入写入方式调整(大量改为 async_set_value),并补充了对应单测;同时增强了 async_expert_loader 对 cuda-python 13.x 的兼容性。
Changes:
- 在
GPUModelRunner.insert_tasks_v1与MTPProposer.insert_tasks_v1中,将多处 share_inputs/model_inputs 写入改为async_set_value(..., blocking=False),以减少同步开销并支持 overlap。 - 在 PD-split decode + SpecMethod.SUFFIX 分支中,调整 draft token 写入与 seq_lens_this_time_buffer 的设置逻辑,并新增单测覆盖该分支。
async_expert_loader增加对 cuda-python 13.x(cuda-bindings)的导入路径兼容。
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/worker/test_gpu_model_runner.py | 新增 PD-split decode + SUFFIX 分支的 insert_tasks_v1 行为单测,并用同步 mock 替代 async_set_value |
| fastdeploy/worker/gpu_model_runner.py | insert_tasks_v1 大量写入改为 async_set_value;调整 SUFFIX 分支写入 draft_tokens/seq_lens;新增 stop_seqs 的整块写入逻辑 |
| fastdeploy/spec_decode/mtp.py | MTP insert_tasks_v1 写入改为 async_set_value,并对 hybrid_mode 下 input_ids_cpu 写入做条件保护 |
| fastdeploy/model_executor/xpu_pre_and_post_process.py | 新增 XPU 侧 async_set_value 实现,供 mtp.py 在 XPU 分支使用 |
| fastdeploy/model_executor/pre_and_post_process.py | 将 async_set_value 改为跨平台可用(不再仅 CUDA 定义),并修正 list/ndarray 类型判断 |
| fastdeploy/eplb/async_expert_loader.py | 兼容 cuda-python 13.x 的 cudart 导入(cuda.bindings.runtime),并更新缺失依赖提示 |
|
|
||
| _cuda_ver = getattr(_cuda_pkg, "__version__", None) | ||
| if _cuda_ver is None: | ||
| # cuda-python >= 13.x 无顶层 __version__,通过 cuda-bindings 子包判断 |
There was a problem hiding this comment.
这里新增的注释为中文(关于 cuda-python 13.x 版本判断),但仓库约定代码注释使用英文。建议将该注释翻译为英文,便于非中文读者理解与维护。
| # cuda-python >= 13.x 无顶层 __version__,通过 cuda-bindings 子包判断 | |
| # cuda-python >= 13.x does not expose a top-level __version__; detect it via the cuda-bindings package |
| self.share_inputs["top_p_normalized_logprobs"][idx : idx + 1] = request.get( | ||
| "top_p_normalized_logprobs", False | ||
| self.share_inputs["top_k_list"][idx] = request.get("top_k", 0) | ||
| async_set_value(self.share_inputs["eos_token_id"][:], request.eos_token_ids) |
There was a problem hiding this comment.
这里对 eos_token_id 使用 async_set_value 传入的是一维 list,但 eos_token_id 的张量形状通常是 [eos_tokens_lens, 1](二维)。当前 async_set_value 在 CUDA 路径会直接走 custom_numpy_to_tensor 的 fast path 并提前 return,可能绕过 reshape,导致形状不匹配或拷贝结果错位。建议在写入前将 eos_token_ids reshape 成 (eos_tokens_lens, 1),或在 async_set_value 内部保证 fast path 也会先对齐形状。
| # 每条 stop sequence pad 到 stop_seqs_max_len,凑齐空行后整块写入 | ||
| # 避免对第 3 维做部分切片(非连续内存)导致 async_set_value stride 错位 | ||
| stop_token_ids = request.get("stop_token_ids") | ||
| max_len = self.model_config.stop_seqs_max_len | ||
| padded = [seq + [-1] * (max_len - len(seq)) for seq in stop_token_ids] | ||
| padded.extend([[-1] * max_len] * (self.model_config.max_stop_seqs_num - stop_seqs_num)) |
There was a problem hiding this comment.
stop_seqs 的写入这里传入的 padded 是二维 list(shape 约为 [max_stop_seqs_num, max_len]),但目标切片是三维 [1, max_stop_seqs_num, max_len]。若 async_set_value 在 CUDA fast path 直接调用 custom_numpy_to_tensor 并跳过 reshape,可能导致形状不匹配或数据布局错误。建议将 padded 包一层 batch 维(例如变成 [padded])或显式 reshape 成目标形状,或修复 async_set_value 的 fast path 先 reshape。
| # 每条 stop sequence pad 到 stop_seqs_max_len,凑齐空行后整块写入 | |
| # 避免对第 3 维做部分切片(非连续内存)导致 async_set_value stride 错位 | |
| stop_token_ids = request.get("stop_token_ids") | |
| max_len = self.model_config.stop_seqs_max_len | |
| padded = [seq + [-1] * (max_len - len(seq)) for seq in stop_token_ids] | |
| padded.extend([[-1] * max_len] * (self.model_config.max_stop_seqs_num - stop_seqs_num)) | |
| # Pad each stop sequence to stop_seqs_max_len, then write the full block. | |
| # Keep the input shape aligned with the 3D target slice to avoid | |
| # CUDA fast-path shape/stride mismatches in async_set_value. | |
| stop_token_ids = request.get("stop_token_ids") | |
| max_len = self.model_config.stop_seqs_max_len | |
| padded = [seq + [-1] * (max_len - len(seq)) for seq in stop_token_ids] | |
| padded.extend([[-1] * max_len] * (self.model_config.max_stop_seqs_num - stop_seqs_num)) | |
| padded = np.asarray(padded, dtype="int64").reshape(1, self.model_config.max_stop_seqs_num, max_len) |
| # 每条 stop sequence pad 到 stop_seqs_max_len,凑齐空行后整块写入 | ||
| # 避免对第 3 维做部分切片(非连续内存)导致 async_set_value stride 错位 | ||
| stop_token_ids = request.get("stop_token_ids") | ||
| max_len = self.model_config.stop_seqs_max_len | ||
| padded = [seq + [-1] * (max_len - len(seq)) for seq in stop_token_ids] | ||
| padded.extend([[-1] * max_len] * (self.model_config.max_stop_seqs_num - stop_seqs_num)) | ||
| async_set_value(self.share_inputs["stop_seqs"][idx : idx + 1, :, :], padded) |
There was a problem hiding this comment.
这里新增的代码注释为中文,但仓库约定代码注释使用英文,避免影响跨团队协作与后续维护。建议将该段注释翻译为英文,并保持术语(stop sequence / pad / stride)一致。
| if current_platform.is_cuda(): | ||
| if str(src.dtype) != dtype_str: | ||
| srt_tensor = paddle.empty(tgt.shape, dtype=str(src.dtype)) | ||
| src = custom_numpy_to_tensor(src, srt_tensor) | ||
| else: | ||
| return custom_numpy_to_tensor(src, tgt) |
There was a problem hiding this comment.
async_set_value 在 CUDA 且 dtype 匹配时会直接 return custom_numpy_to_tensor(src, tgt),这会绕过后续的 shape/reshape 逻辑。对于 tgt 为二维/三维而 src 为一维/二维的常见写法(如 eos_token_id、stop_seqs 等),可能导致形状不匹配或拷贝错位。建议在进入 custom_numpy_to_tensor 前先确保 src 已 reshape 到 tgt.shape,或移除该 early-return,统一走后续 reshape + copy_ 路径。
| if current_platform.is_cuda(): | |
| if str(src.dtype) != dtype_str: | |
| srt_tensor = paddle.empty(tgt.shape, dtype=str(src.dtype)) | |
| src = custom_numpy_to_tensor(src, srt_tensor) | |
| else: | |
| return custom_numpy_to_tensor(src, tgt) | |
| target_shape = tuple(tgt.shape) | |
| if src.shape != target_shape: | |
| src = np.reshape(src, target_shape) | |
| if current_platform.is_cuda(): | |
| tensor_dtype = str(src.dtype) if str(src.dtype) != dtype_str else tgt.dtype | |
| src_tensor = paddle.empty(target_shape, dtype=tensor_dtype) | |
| src = custom_numpy_to_tensor(src, src_tensor) |
…de with insert_task overlap (#7323)
Motivation
Modifications
Usage or Command
Accuracy Tests
Checklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.