Skip to content

Commit 2cb6342

Browse files
committed
rate limit
1 parent 2423da5 commit 2cb6342

7 files changed

Lines changed: 91 additions & 3 deletions

File tree

ucm/integration/vllm/ucm_connector.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ def get_num_new_matched_tokens(
354354
except RuntimeError as e:
355355
external_hit_blocks = 0
356356
logger.error(f"request {request.request_id} look up error. {e}")
357-
logger.info(
357+
logger.info_once(
358358
f"request_id: {request.request_id}, "
359359
f"total_blocks_num: {len(ucm_block_ids)}, "
360360
f"hit hbm: {hbm_hit_block_num}, "

ucm/logger.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ def format_log_msg(msg, *args) -> str:
105105
return msg % args
106106
return msg
107107

108-
def log(self, levelno, message, *args, exc_info=None, scope=None):
108+
def log(self, levelno, message, *args, exc_info=None, scope=None, rate_limit=False):
109109
level = LevelMap[levelno]
110110
frame = inspect.currentframe()
111111
caller_frame = frame.f_back.f_back
@@ -116,6 +116,9 @@ def log(self, levelno, message, *args, exc_info=None, scope=None):
116116
if exc_info:
117117
exc_text = self.format_exception(exc_info)
118118
msg = msg + "\n" + exc_text
119+
if rate_limit:
120+
ucmlogger.log_rate_limit(level, file, func, line, msg, message)
121+
return
119122
ucmlogger.log(level, file, func, line, msg)
120123

121124
@staticmethod
@@ -148,6 +151,15 @@ def debug_once(self, message: str, *args: Hashable, **kwargs: Hashable):
148151
def exception(self, message: str, *args: Hashable, **kwargs: Hashable):
149152
self.log(logging.ERROR, message, *args, **kwargs, exc_info=True)
150153

154+
def info_limit(self, message: str, *args, **kwargs):
155+
self.log(logging.INFO, message, *args, **kwargs, rate_limit=True)
156+
157+
def warning_limit(self, message: str, *args, **kwargs):
158+
self.log(logging.WARNING, message, *args, **kwargs, rate_limit=True)
159+
160+
def debug_limit(self, message: str, *args, **kwargs):
161+
self.log(logging.DEBUG, message, *args, **kwargs, rate_limit=True)
162+
151163

152164
def init_logger(name: str = "UC") -> Logger:
153165
return Logger(name)

ucm/shared/infra/logger/cc/spdlog_logger.cc

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,39 @@ void Logger::Log(Level&& lv, SourceLocation&& loc, std::string&& msg)
4545
this->logger_->log(spdlog::source_loc{loc.file, loc.line, loc.func}, level, std::move(msg));
4646
}
4747

48+
bool Logger::Filter(std::string&& msg)
49+
{
50+
using Clock = std::chrono::steady_clock;
51+
const auto now = Clock::now();
52+
53+
const std::size_t key = std::hash<std::string>{}(msg);
54+
55+
std::lock_guard<std::mutex> lg(this->mutex_);
56+
57+
auto it = cache_.find(key);
58+
if (it != cache_.end()) {
59+
auto& entry = it->second;
60+
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - entry.last_time);
61+
if (elapsed < rate_limit_window_) {
62+
lru_list_.splice(lru_list_.begin(), lru_list_, entry.lru_it);
63+
return false;
64+
}
65+
entry.last_time = now;
66+
lru_list_.splice(lru_list_.begin(), lru_list_, entry.lru_it);
67+
return true;
68+
}
69+
70+
if (cache_.size() >= cache_capacity_ && !lru_list_.empty()) {
71+
auto lru_it = std::prev(lru_list_.end());
72+
cache_.erase(*lru_it);
73+
lru_list_.pop_back();
74+
}
75+
76+
lru_list_.push_front(key);
77+
cache_.emplace(lru_list_.front(), CacheEntry{now, lru_list_.begin()});
78+
return true;
79+
}
80+
4881
std::shared_ptr<spdlog::logger> Logger::Make()
4982
{
5083
if (this->logger_) { return this->logger_; }

ucm/shared/infra/logger/cc/spdlog_logger.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,14 @@
2323
* */
2424
#ifndef UNIFIEDCACHE_INFRA_LOGGER_SPDLOG_LOGGER_H
2525
#define UNIFIEDCACHE_INFRA_LOGGER_SPDLOG_LOGGER_H
26+
#include <chrono>
2627
#include <csignal>
2728
#include <cstdlib>
29+
#include <list>
30+
#include <mutex>
2831
#include <spdlog/spdlog.h>
32+
#include <string>
33+
#include <unordered_map>
2934
namespace UC::Logger {
3035

3136
enum class Level { DEBUG, INFO, WARN, ERROR, CRITICAL };
@@ -68,7 +73,20 @@ class Logger {
6873

6974
bool IsEnabledFor(Level lv);
7075

76+
bool Filter(std::string&& msg);
77+
7178
private:
79+
using TimePoint = std::chrono::steady_clock::time_point;
80+
struct CacheEntry {
81+
TimePoint last_time;
82+
std::list<std::size_t>::iterator lru_it;
83+
};
84+
85+
std::list<std::size_t> lru_list_;
86+
std::unordered_map<std::size_t, CacheEntry> cache_;
87+
std::size_t cache_capacity_{128};
88+
std::chrono::seconds rate_limit_window_{std::chrono::seconds(60)};
89+
7290
std::shared_ptr<spdlog::logger> Make();
7391
std::string path_{"log"};
7492
int max_files_{3};

ucm/shared/infra/logger/cpy/spdlog_logger.py.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,19 @@ void LogWrapper(Level lv, std::string file, std::string func, int line, std::str
3232
Log(std::move(lv), std::move(file), std::move(func), line, std::move(msg));
3333
}
3434

35+
void RateLimitLogWrapper(Level lv, std::string file, std::string func, int line, std::string msg,
36+
std::string ori_msg)
37+
{
38+
LogRateLimit(std::move(lv), std::move(file), std::move(func), line, std::move(msg),
39+
std::move(ori_msg));
40+
}
41+
3542
PYBIND11_MODULE(ucmlogger, m)
3643
{
3744
m.def("setup", &Setup);
3845
m.def("flush", &Flush);
3946
m.def("log", &LogWrapper);
47+
m.def("log_rate_limit", &RateLimitLogWrapper);
4048
m.def("isEnabledFor", &isEnabledFor);
4149
py::enum_<Level>(m, "Level")
4250
.value("DEBUG", Level::DEBUG)

ucm/shared/infra/logger/logger.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,16 @@ void Log(Level lv, std::string file, std::string func, int line, std::string msg
3333
std::move(msg));
3434
}
3535

36+
void LogRateLimit(Level lv, std::string file, std::string func, int line, std::string msg,
37+
std::string ori_msg)
38+
{
39+
std::string full_msg = ori_msg + file + func + std::to_string(line);
40+
if (Logger::GetInstance().Filter(std::move(full_msg))) {
41+
Logger::GetInstance().Log(std::move(lv), SourceLocation{file.c_str(), func.c_str(), line},
42+
std::move(msg));
43+
}
44+
}
45+
3646
void Setup(const std::string& path, int max_files, int max_size)
3747
{
3848
Logger::GetInstance().Setup(path, max_files, max_size);

ucm/shared/infra/logger/logger.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
namespace UC::Logger {
3333

3434
void Log(Level lv, std::string file, std::string func, int line, std::string msg);
35+
void LogRateLimit(Level lv, std::string file, std::string func, int line, std::string msg,
36+
std::string ori_msg);
3537

3638
template <typename... Args>
3739
void Log(Level lv, const SourceLocation& loc, fmt::format_string<Args...> fmt, Args&&... args)
@@ -46,9 +48,14 @@ bool isEnabledFor(Level lv);
4648
} // namespace UC::Logger
4749
#define UC_SOURCE_LOCATION {__FILE__, __FUNCTION__, __LINE__}
4850
#define UC_LOG(lv, fmt, ...) UC::Logger::Log(lv, UC_SOURCE_LOCATION, FMT_STRING(fmt), ##__VA_ARGS__)
51+
#define UC_LOG_LIMIT(lv, fmt, ...) \
52+
UC::Logger::LogRateLimit(lv, UC_SOURCE_LOCATION, FMT_STRING(fmt), ##__VA_ARGS__)
4953
#define UC_DEBUG(fmt, ...) UC_LOG(UC::Logger::Level::DEBUG, fmt, ##__VA_ARGS__)
5054
#define UC_INFO(fmt, ...) UC_LOG(UC::Logger::Level::INFO, fmt, ##__VA_ARGS__)
5155
#define UC_WARN(fmt, ...) UC_LOG(UC::Logger::Level::WARN, fmt, ##__VA_ARGS__)
5256
#define UC_ERROR(fmt, ...) UC_LOG(UC::Logger::Level::ERROR, fmt, ##__VA_ARGS__)
53-
57+
#define UC_DEBUG_LIMIT(fmt, ...) UC_LOG_LIMIT(UC::Logger::Level::DEBUG, fmt, ##__VA_ARGS__)
58+
#define UC_INFO_LIMIT(fmt, ...) UC_LOG_LIMIT(UC::Logger::Level::INFO, fmt, ##__VA_ARGS__)
59+
#define UC_WARN_LIMIT(fmt, ...) UC_LOG_LIMIT(UC::Logger::Level::WARN, fmt, ##__VA_ARGS__)
60+
#define UC_ERROR_LIMIT(fmt, ...) UC_LOG_LIMIT(UC::Logger::Level::ERROR, fmt, ##__VA_ARGS__)
5461
#endif

0 commit comments

Comments
 (0)