Skip to content

Commit 136bb1b

Browse files
authored
Optimize sleep level=1 for turbomind backend (#4074)
* use pinned buffer to store weights * add /is_sleeping endpoint * update pt * fix comments
1 parent 281e101 commit 136bb1b

File tree

6 files changed

+38
-14
lines changed

6 files changed

+38
-14
lines changed

lmdeploy/pytorch/engine/model_agent.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1052,7 +1052,8 @@ def sleep(self, level: int = 1):
10521052
self.cache_engine = None
10531053
self.reset_graph_runner()
10541054
device = 'cpu' if level == 1 else 'meta'
1055-
self.patched_model.get_model().to(device=device)
1055+
self.patched_model.get_model().to(device=device, non_blocking=True)
1056+
torch.cuda.synchronize()
10561057
torch.cuda.empty_cache()
10571058

10581059
@torch.inference_mode()

lmdeploy/serve/async_engine.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,8 @@ def __init__(self,
332332
else:
333333
raise ValueError(f'unsupported backend {backend}')
334334
self.backend_config = self.engine.engine_config
335+
self.is_sleeping = backend_config.empty_init
336+
self.sleeping_tags: set[str] = set() if not backend_config.empty_init else {'weights', 'kv_cache'}
335337
logger.info(f'updated backend_config={self.backend_config}')
336338

337339
# parameters for member functions
@@ -491,6 +493,8 @@ def sleep(self, level: int = 1):
491493
discard both the model weights and the kv cache.
492494
"""
493495
self.engine.sleep(level)
496+
self.sleeping_tags = {'weights', 'kv_cache'}
497+
self.is_sleeping = True
494498

495499
def wakeup(self, tags: Optional[List[str]] = None):
496500
"""Wake up the model.
@@ -502,11 +506,17 @@ def wakeup(self, tags: Optional[List[str]] = None):
502506
wake_up should be called with all tags (or None) before the
503507
engine is used again.
504508
"""
509+
tags = tags or list(self.sleeping_tags)
510+
if any(tag not in self.sleeping_tags for tag in tags):
511+
logger.warning(f'some tag in {tags} not in sleeping tags {self.sleeping_tags}')
512+
return
505513
self.engine.wakeup(tags)
506514
# for TM backend, sleep/wakeup will reset gateway, therefore we need to rebuild instance
507-
if self.backend == 'turbomind' and (tags is None or 'kv_cache' in tags):
515+
if self.backend == 'turbomind' and 'kv_cache' in tags:
508516
self.instances = [self.engine.create_instance() for _ in range(self.instance_num)]
509517
self.free_insts = None
518+
self.sleeping_tags = self.sleeping_tags - set(tags)
519+
self.is_sleeping = bool(self.sleeping_tags)
510520

511521
def _get_limiter(self):
512522
if not self.limiter:

lmdeploy/serve/openai/api_server.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1115,6 +1115,12 @@ async def wakeup(raw_request: Request = None):
11151115
return Response(status_code=200)
11161116

11171117

1118+
@router.get('/is_sleeping', dependencies=[Depends(check_api_key)])
1119+
async def is_sleeping(raw_request: Request = None):
1120+
is_sleeping = VariableInterface.async_engine.is_sleeping
1121+
return JSONResponse(content={'is_sleeping': is_sleeping})
1122+
1123+
11181124
""" PD Disaggregation API Begin """
11191125

11201126

lmdeploy/turbomind/turbomind.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ def _get_model_params(self):
213213

214214
model_comm = self.model_comm
215215
tm_params = self._tm_model.tm_params
216+
tm_params.clear()
216217

217218
def _get_params(device_id, que):
218219
rank = self.node_id * self.gpu_count + device_id

src/turbomind/models/llama/LlamaWeight.cc

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ void LlamaWeight::release()
116116
}
117117

118118
decoder_layer_weights.clear();
119+
pinned_weights_.clear();
119120

120121
// Wait for deallocations
121122
core::Context::stream().Sync();
@@ -127,21 +128,22 @@ void LlamaWeight::release()
127128

128129
void LlamaWeight::to_device(const core::Device& device)
129130
{
130-
core::ContextGuard guard = context();
131-
132-
auto to_device = [&](Tensor& x) -> Tensor {
133-
auto tmp = std::exchange(x, empty_like(x, device));
134-
Copy(tmp, x);
135-
return tmp;
136-
};
137-
138-
std::vector<Tensor> tmp_cpu_tensors;
131+
TM_CHECK(device.type == kCPU || device.type == kDEVICE);
132+
core::ContextGuard guard{stream_, alloca_, Allocator{kCPUpinned}};
139133

140134
auto tensor_ptr_map = get_parameters();
141135
for (auto& [name, tensor_ptr] : tensor_ptr_map) {
142-
auto tmp_tensor = to_device(*tensor_ptr);
143-
if (tmp_tensor.device().type != kDEVICE) {
144-
tmp_cpu_tensors.push_back(tmp_tensor);
136+
if (device.type == kCPU) {
137+
if (pinned_weights_.find(name) == pinned_weights_.end()) {
138+
pinned_weights_[name] = empty_like(*tensor_ptr, kCPUpinned);
139+
Copy(*tensor_ptr, pinned_weights_[name]);
140+
}
141+
*tensor_ptr = {};
142+
}
143+
else {
144+
TM_CHECK(pinned_weights_.find(name) != pinned_weights_.end());
145+
*tensor_ptr = empty_like(pinned_weights_[name], kDEVICE);
146+
Copy(pinned_weights_[name], *tensor_ptr);
145147
}
146148
}
147149
core::Context::stream().Sync();

src/turbomind/models/llama/LlamaWeight.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
#pragma once
2222

23+
#include <unordered_map>
24+
2325
#include "src/turbomind/core/context.h"
2426
#include "src/turbomind/models/llama/LlamaDecoderLayerWeight.h"
2527
#include "src/turbomind/models/llama/LlamaDenseWeight.h"
@@ -75,6 +77,8 @@ struct LlamaWeight: core::Module {
7577
DataType data_type_;
7678
DataType weight_type_;
7779

80+
std::unordered_map<std::string, Tensor> pinned_weights_;
81+
7882
int tp_size_; // this will follow attn tp param
7983
int tp_rank_;
8084

0 commit comments

Comments
 (0)