Skip to content

Commit 0f75b62

Browse files
[BugFix] Fix profile run in pd-disaggregated deployment (#4584)
* [fix] fix pd+dp+ep bug * [fix] fix again * [ci] fix code style
1 parent 64e875b commit 0f75b62

File tree

4 files changed

+32
-4
lines changed

4 files changed

+32
-4
lines changed

fastdeploy/engine/common_engine.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -760,6 +760,9 @@ def _fetch_request():
760760
else:
761761
time.sleep(0.005)
762762

763+
except RuntimeError as e:
764+
if "cannot schedule new futures after shutdown" in str(e):
765+
break
763766
except Exception as e:
764767
err_msg = "Error happend while insert task to engine: {}, {}.".format(e, str(traceback.format_exc()))
765768
self.llm_logger.error(err_msg)

fastdeploy/engine/engine.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,25 @@ def start(self, api_server_pid=None):
101101
Initializes the engine and starts its sub-services.
102102
If `api_server_pid` is defined, will launch a thread
103103
to keep getting request from zmq_server.
104+
105+
NOTE: To clarify the launch order of the components of the LLM engine:
106+
1. First, launch splitwise scheduler (if necessary) and expert services (if necessary).
107+
2. Then, launch common engine, which includes some background threads that inserts tasks and receives ouptuts.
108+
3. Most importantly, launch workers and cache services. The launch order of them are listed as follows.
109+
110+
| Profile | Mixed | PrefixCache | Cache -> Worker | Worker -> Cache |
111+
|---------|-------|-------------|-----------------|-----------------|
112+
| 1 | 1 | 1 | 0 | 1 |
113+
| 1 | 1 | 0 | 0 | 0 |
114+
| 1 | 0 | 1 | 0 | 1 |
115+
| 1 | 0 | 0 | 0 | 1 |
116+
| 0 | 1 | 1 | 0 | 1 |
117+
| 0 | 1 | 0 | 0 | 0 |
118+
| 0 | 0 | 1 | 1 | 0 |
119+
| 0 | 0 | 0 | 1 | 0 |
120+
121+
4. Finally, inform user the engine has successfully started.
122+
104123
"""
105124
assert not self.is_started, "The engine is already started."
106125
start_time = time.time()
@@ -109,7 +128,6 @@ def start(self, api_server_pid=None):
109128
self.ipc_signal_suffix = self.cfg.parallel_config.engine_worker_queue_port[0]
110129
self._init_worker_signals()
111130

112-
# Launch components: scheduler, cache_manager, expert_service et.al.
113131
self.launch_components()
114132

115133
self.engine.start()
@@ -151,7 +169,7 @@ def check_worker_initialize_status_func(res: dict):
151169
# and then start the cache manager
152170
if self.do_profile:
153171
self._stop_profile()
154-
elif self.cfg.cache_config.enable_prefix_caching:
172+
elif self.cfg.scheduler_config.splitwise_role == "mixed" and self.cfg.cache_config.enable_prefix_caching:
155173
device_ids = self.cfg.parallel_config.device_ids.split(",")
156174
self.cache_manager_processes = self.engine.start_cache_service(device_ids, self.ipc_signal_suffix)
157175

fastdeploy/inter_communicator/ipc_signal.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,15 @@ def __init__(
8080
name = name + f".{suffix}"
8181

8282
if create:
83+
llm_logger.debug(f"creating ipc signal: {name}")
8384
if shared_memory_exists(name):
8485
llm_logger.warning(f"ShareMemory: {name} already exists, delete it")
8586
SharedMemory(name=name, create=False).unlink()
8687
self.shm = SharedMemory(create=True, size=array.nbytes, name=name)
8788
self.value: np.ndarray = np.ndarray(array.shape, dtype=array.dtype, buffer=self.shm.buf)
8889
self.value[:] = array # Initialize with input array data
8990
else:
91+
llm_logger.debug(f"attaching ipc signal: {name}")
9092
self.shm = SharedMemory(name=name)
9193
self.value: np.ndarray = np.ndarray(array.shape, dtype=array.dtype, buffer=self.shm.buf)
9294

fastdeploy/worker/worker_process.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,12 @@
4141
StructuredOutputsConfig,
4242
)
4343
from fastdeploy.inter_communicator import EngineWorkerQueue as TaskQueue
44-
from fastdeploy.inter_communicator import ExistTaskStatus, IPCSignal, ModelWeightsStatus
44+
from fastdeploy.inter_communicator import (
45+
ExistTaskStatus,
46+
IPCSignal,
47+
ModelWeightsStatus,
48+
shared_memory_exists,
49+
)
4550
from fastdeploy.model_executor.layers.quantization import parse_quant_config
4651
from fastdeploy.model_executor.utils import v1_loader_support
4752
from fastdeploy.platforms import current_platform
@@ -426,7 +431,7 @@ def graph_optimize_and_warm_up_model(self) -> None:
426431
array=prefilled_step_idx_data,
427432
dtype=np.int32,
428433
suffix=gpu_id,
429-
create=False,
434+
create=not shared_memory_exists(prefilled_step_name),
430435
)
431436
step_shm_value.value[0] = -1
432437

0 commit comments

Comments
 (0)