Skip to content

Commit eb7eaa1

Browse files
committed
rate limit
1 parent 9d72aed commit eb7eaa1

File tree

8 files changed

+370
-6
lines changed

8 files changed

+370
-6
lines changed

ucm/integration/vllm/ucm_connector.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ def get_num_new_matched_tokens(
374374
except RuntimeError as e:
375375
external_hit_blocks = 0
376376
logger.error(f"request {request.request_id} look up error. {e}")
377-
logger.info(
377+
logger.info_once(
378378
f"request_id: {request.request_id}, "
379379
f"total_blocks_num: {len(ucm_block_ids)}, "
380380
f"hit hbm: {hbm_hit_block_num}, "

ucm/logger.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ def format_log_msg(msg, *args) -> str:
105105
return msg % args
106106
return msg
107107

108-
def log(self, levelno, message, *args, exc_info=None, scope=None):
108+
def log(self, levelno, message, *args, exc_info=None, scope=None, rate_limit=False):
109109
level = LevelMap[levelno]
110110
frame = inspect.currentframe()
111111
caller_frame = frame.f_back.f_back
@@ -116,6 +116,9 @@ def log(self, levelno, message, *args, exc_info=None, scope=None):
116116
if exc_info:
117117
exc_text = self.format_exception(exc_info)
118118
msg = msg + "\n" + exc_text
119+
if rate_limit:
120+
ucmlogger.log_rate_limit(level, file, func, line, msg)
121+
return
119122
ucmlogger.log(level, file, func, line, msg)
120123

121124
@staticmethod
@@ -148,6 +151,15 @@ def debug_once(self, message: str, *args: Hashable, **kwargs: Hashable):
148151
def exception(self, message: str, *args: Hashable, **kwargs: Hashable):
149152
self.log(logging.ERROR, message, *args, **kwargs, exc_info=True)
150153

154+
def info_limit(self, message: str, *args, **kwargs):
155+
self.log(logging.INFO, message, *args, **kwargs, rate_limit=True)
156+
157+
def warning_limit(self, message: str, *args, **kwargs):
158+
self.log(logging.WARNING, message, *args, **kwargs, rate_limit=True)
159+
160+
def debug_limit(self, message: str, *args, **kwargs):
161+
self.log(logging.DEBUG, message, *args, **kwargs, rate_limit=True)
162+
151163

152164
def init_logger(name: str = "UC") -> Logger:
153165
return Logger(name)

ucm/shared/infra/logger/cc/spdlog_logger.cc

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,14 @@
3131
#include <spdlog/spdlog.h>
3232
#include "compress_rotate_file_sink.h"
3333
#include "logger.h"
34-
3534
namespace UC::Logger {
36-
35+
constexpr uint64_t LIMIT_THRESHOLD_MS = 60000;
36+
constexpr uint32_t RATE_LIMIT_MAX_LOGS_PER_WINDOW = 3;
37+
constexpr uint32_t kRateLimitCountBits = 2;
38+
constexpr uint64_t kRateLimitCountMask = (1u << kRateLimitCountBits) - 1u;
39+
constexpr size_t kHashMixMagic = 0x9e3779b97f4a7c15ULL;
40+
constexpr size_t kHashShiftLeft = 12;
41+
constexpr size_t kHashShiftRight = 4;
3742
static spdlog::level::level_enum SpdLevels[] = {spdlog::level::debug, spdlog::level::info,
3843
spdlog::level::warn, spdlog::level::err,
3944
spdlog::level::critical};
@@ -45,6 +50,81 @@ void Logger::Log(Level&& lv, SourceLocation&& loc, std::string&& msg)
4550
this->logger_->log(spdlog::source_loc{loc.file, loc.line, loc.func}, level, std::move(msg));
4651
}
4752

53+
inline uint64_t GetCurrentTimeMs()
54+
{
55+
auto now = std::chrono::steady_clock::now();
56+
auto ms = std::chrono::time_point_cast<std::chrono::milliseconds>(now);
57+
return ms.time_since_epoch().count();
58+
}
59+
60+
bool Logger::FilterCallSite(const char* file, int line)
61+
{
62+
uint64_t now = GetCurrentTimeMs();
63+
const std::string_view fv(file);
64+
std::hash<std::string_view> h;
65+
size_t x = h(fv);
66+
x ^= static_cast<size_t>(line) + kHashMixMagic + (x << kHashShiftLeft) + (x >> kHashShiftRight);
67+
const uint64_t full_hash = static_cast<uint64_t>(x);
68+
const size_t slot_idx = static_cast<size_t>(full_hash % HASH_SLOT_NUM);
69+
// key_tag=0 is reserved for empty; so shift by +1.
70+
const uint64_t key_tag = full_hash + 1u;
71+
72+
auto& slot = hash_slots_[slot_idx];
73+
std::atomic<uint64_t>* rate_state = nullptr;
74+
75+
// 1) Lookup: find an existing chain entry with the same key.
76+
for (size_t i = 0; i < HASH_CHAIN_LEN; ++i) {
77+
uint64_t stored = slot.chain_entries[i].key_hash.load(std::memory_order_relaxed);
78+
if (stored == key_tag) {
79+
rate_state = &slot.chain_entries[i].rate_limit_state;
80+
break;
81+
}
82+
}
83+
84+
// 2) Insert: if key not found, try to claim an empty entry.
85+
if (rate_state == nullptr) {
86+
for (size_t i = 0; i < HASH_CHAIN_LEN; ++i) {
87+
uint64_t expected_empty = 0;
88+
if (slot.chain_entries[i].key_hash.compare_exchange_strong(expected_empty, key_tag,
89+
std::memory_order_relaxed,
90+
std::memory_order_relaxed)) {
91+
rate_state = &slot.chain_entries[i].rate_limit_state;
92+
break;
93+
}
94+
}
95+
}
96+
97+
// 3) Evict: if the chain is full, overwrite a deterministic entry.
98+
if (rate_state == nullptr) {
99+
const size_t evict_idx = static_cast<size_t>(key_tag % HASH_CHAIN_LEN);
100+
rate_state = &slot.chain_entries[evict_idx].rate_limit_state;
101+
slot.chain_entries[evict_idx].key_hash.store(key_tag, std::memory_order_relaxed);
102+
slot.chain_entries[evict_idx].rate_limit_state.store(0, std::memory_order_relaxed);
103+
}
104+
105+
uint64_t s = rate_state->load(std::memory_order_relaxed);
106+
const uint64_t window_start = s >> kRateLimitCountBits;
107+
const uint32_t count = static_cast<uint32_t>(s & kRateLimitCountMask);
108+
109+
if (s == 0 || now - window_start > LIMIT_THRESHOLD_MS) {
110+
const uint64_t desired = (now << kRateLimitCountBits) | 1u;
111+
if (rate_state->compare_exchange_strong(s, desired, std::memory_order_relaxed,
112+
std::memory_order_relaxed)) {
113+
return true;
114+
}
115+
return false;
116+
}
117+
118+
if (count >= RATE_LIMIT_MAX_LOGS_PER_WINDOW) { return false; }
119+
const uint64_t desired =
120+
(window_start << kRateLimitCountBits) | static_cast<uint64_t>(count + 1u);
121+
if (rate_state->compare_exchange_strong(s, desired, std::memory_order_relaxed,
122+
std::memory_order_relaxed)) {
123+
return true;
124+
}
125+
return false;
126+
}
127+
48128
std::shared_ptr<spdlog::logger> Logger::Make()
49129
{
50130
if (this->logger_) { return this->logger_; }

ucm/shared/infra/logger/cc/spdlog_logger.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,17 @@
2323
* */
2424
#ifndef UNIFIEDCACHE_INFRA_LOGGER_SPDLOG_LOGGER_H
2525
#define UNIFIEDCACHE_INFRA_LOGGER_SPDLOG_LOGGER_H
26+
#include <array>
27+
#include <atomic>
28+
#include <chrono>
2629
#include <csignal>
2730
#include <cstdlib>
31+
#include <mutex>
2832
#include <spdlog/spdlog.h>
2933
namespace UC::Logger {
3034

35+
constexpr size_t HASH_SLOT_NUM = 512;
36+
constexpr size_t HASH_CHAIN_LEN = 4;
3137
enum class Level { DEBUG, INFO, WARN, ERROR, CRITICAL };
3238
struct SourceLocation {
3339
const char* file = "";
@@ -68,7 +74,20 @@ class Logger {
6874

6975
bool IsEnabledFor(Level lv);
7076

77+
bool FilterCallSite(const char* file, int line);
78+
7179
private:
80+
struct ChainEntryData {
81+
std::atomic<uint64_t> key_hash{0};
82+
std::atomic<uint64_t> rate_limit_state{0};
83+
};
84+
85+
struct SlotData {
86+
std::array<ChainEntryData, HASH_CHAIN_LEN> chain_entries;
87+
};
88+
89+
std::array<SlotData, HASH_SLOT_NUM> hash_slots_;
90+
7291
std::shared_ptr<spdlog::logger> Make();
7392
std::string path_{"log"};
7493
int max_files_{3};

ucm/shared/infra/logger/cpy/spdlog_logger.py.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,17 @@ void LogWrapper(Level lv, std::string file, std::string func, int line, std::str
3232
Log(std::move(lv), std::move(file), std::move(func), line, std::move(msg));
3333
}
3434

35+
void RateLimitLogWrapper(Level lv, std::string file, std::string func, int line, std::string msg)
36+
{
37+
LogRateLimit(std::move(lv), std::move(file), std::move(func), line, std::move(msg));
38+
}
39+
3540
PYBIND11_MODULE(ucmlogger, m)
3641
{
3742
m.def("setup", &Setup);
3843
m.def("flush", &Flush);
3944
m.def("log", &LogWrapper);
45+
m.def("log_rate_limit", &RateLimitLogWrapper);
4046
m.def("isEnabledFor", &isEnabledFor);
4147
py::enum_<Level>(m, "Level")
4248
.value("DEBUG", Level::DEBUG)

ucm/shared/infra/logger/logger.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@ void Log(Level lv, std::string file, std::string func, int line, std::string msg
3333
std::move(msg));
3434
}
3535

36+
void LogRateLimit(Level lv, std::string file, std::string func, int line, std::string msg)
37+
{
38+
if (Logger::GetInstance().FilterCallSite(file.c_str(), line)) {
39+
Logger::GetInstance().Log(std::move(lv), SourceLocation{file.c_str(), func.c_str(), line},
40+
std::move(msg));
41+
}
42+
}
43+
3644
void Setup(const std::string& path, int max_files, int max_size)
3745
{
3846
Logger::GetInstance().Setup(path, max_files, max_size);

ucm/shared/infra/logger/logger.h

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,23 +32,39 @@
3232
namespace UC::Logger {
3333

3434
void Log(Level lv, std::string file, std::string func, int line, std::string msg);
35+
void LogRateLimit(Level lv, std::string file, std::string func, int line, std::string msg);
3536

3637
template <typename... Args>
3738
void Log(Level lv, const SourceLocation& loc, fmt::format_string<Args...> fmt, Args&&... args)
3839
{
3940
std::string msg = fmt::format(fmt, std::forward<Args>(args)...);
4041
Log(lv, std::string(loc.file), std::string(loc.func), loc.line, std::move(msg));
4142
}
43+
44+
template <typename... Args>
45+
void LogRateLimit(Level lv, const SourceLocation& loc, fmt::format_string<Args...> fmt,
46+
Args&&... args)
47+
{
48+
std::string msg = fmt::format(fmt, std::forward<Args>(args)...);
49+
LogRateLimit(lv, std::string(loc.file), std::string(loc.func), loc.line, std::move(msg));
50+
}
51+
4252
void Setup(const std::string& path, int max_files, int max_size);
4353
void Flush();
4454
bool isEnabledFor(Level lv);
4555

4656
} // namespace UC::Logger
4757
#define UC_SOURCE_LOCATION {__FILE__, __FUNCTION__, __LINE__}
48-
#define UC_LOG(lv, fmt, ...) UC::Logger::Log(lv, UC_SOURCE_LOCATION, FMT_STRING(fmt), ##__VA_ARGS__)
58+
#define UC_LOG_UNLIMITED(lv, fmt, ...) \
59+
UC::Logger::Log(lv, UC_SOURCE_LOCATION, FMT_STRING(fmt), ##__VA_ARGS__)
60+
#define UC_LOG(lv, fmt, ...) \
61+
UC::Logger::LogRateLimit(lv, UC_SOURCE_LOCATION, FMT_STRING(fmt), ##__VA_ARGS__)
62+
#define UC_DEBUG_UNLIMITED(fmt, ...) UC_LOG_UNLIMITED(UC::Logger::Level::DEBUG, fmt, ##__VA_ARGS__)
63+
#define UC_INFO_UNLIMITED(fmt, ...) UC_LOG_UNLIMITED(UC::Logger::Level::INFO, fmt, ##__VA_ARGS__)
64+
#define UC_WARN_UNLIMITED(fmt, ...) UC_LOG_UNLIMITED(UC::Logger::Level::WARN, fmt, ##__VA_ARGS__)
65+
#define UC_ERROR_UNLIMITED(fmt, ...) UC_LOG_UNLIMITED(UC::Logger::Level::ERROR, fmt, ##__VA_ARGS__)
4966
#define UC_DEBUG(fmt, ...) UC_LOG(UC::Logger::Level::DEBUG, fmt, ##__VA_ARGS__)
5067
#define UC_INFO(fmt, ...) UC_LOG(UC::Logger::Level::INFO, fmt, ##__VA_ARGS__)
5168
#define UC_WARN(fmt, ...) UC_LOG(UC::Logger::Level::WARN, fmt, ##__VA_ARGS__)
5269
#define UC_ERROR(fmt, ...) UC_LOG(UC::Logger::Level::ERROR, fmt, ##__VA_ARGS__)
53-
5470
#endif

0 commit comments

Comments
 (0)