Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ucm/integration/vllm/ucm_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ def get_num_new_matched_tokens(
except RuntimeError as e:
external_hit_blocks = 0
logger.error(f"request {request.request_id} look up error. {e}")
logger.info(
logger.info_once(
f"request_id: {request.request_id}, "
f"total_blocks_num: {len(ucm_block_ids)}, "
f"hit hbm: {hbm_hit_block_num}, "
Expand Down
14 changes: 13 additions & 1 deletion ucm/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def format_log_msg(msg, *args) -> str:
return msg % args
return msg

def log(self, levelno, message, *args, exc_info=None, scope=None):
def log(self, levelno, message, *args, exc_info=None, scope=None, rate_limit=False):
level = LevelMap[levelno]
frame = inspect.currentframe()
caller_frame = frame.f_back.f_back
Expand All @@ -116,6 +116,9 @@ def log(self, levelno, message, *args, exc_info=None, scope=None):
if exc_info:
exc_text = self.format_exception(exc_info)
msg = msg + "\n" + exc_text
if rate_limit:
ucmlogger.log_rate_limit(level, file, func, line, msg)
return
ucmlogger.log(level, file, func, line, msg)

@staticmethod
Expand Down Expand Up @@ -148,6 +151,15 @@ def debug_once(self, message: str, *args: Hashable, **kwargs: Hashable):
def exception(self, message: str, *args: Hashable, **kwargs: Hashable):
self.log(logging.ERROR, message, *args, **kwargs, exc_info=True)

def info_limit(self, message: str, *args, **kwargs):
self.log(logging.INFO, message, *args, **kwargs, rate_limit=True)

def warning_limit(self, message: str, *args, **kwargs):
self.log(logging.WARNING, message, *args, **kwargs, rate_limit=True)

def debug_limit(self, message: str, *args, **kwargs):
self.log(logging.DEBUG, message, *args, **kwargs, rate_limit=True)


def init_logger(name: str = "UC") -> Logger:
return Logger(name)
Expand Down
84 changes: 82 additions & 2 deletions ucm/shared/infra/logger/cc/spdlog_logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,14 @@
#include <spdlog/spdlog.h>
#include "compress_rotate_file_sink.h"
#include "logger.h"

namespace UC::Logger {

constexpr uint64_t LIMIT_THRESHOLD_MS = 60000;
constexpr uint32_t RATE_LIMIT_MAX_LOGS_PER_WINDOW = 3;
constexpr uint32_t kRateLimitCountBits = 2;
constexpr uint64_t kRateLimitCountMask = (1u << kRateLimitCountBits) - 1u;
constexpr size_t kHashMixMagic = 0x9e3779b97f4a7c15ULL;
constexpr size_t kHashShiftLeft = 12;
constexpr size_t kHashShiftRight = 4;
static spdlog::level::level_enum SpdLevels[] = {spdlog::level::debug, spdlog::level::info,
spdlog::level::warn, spdlog::level::err,
spdlog::level::critical};
Expand All @@ -45,6 +50,81 @@ void Logger::Log(Level&& lv, SourceLocation&& loc, std::string&& msg)
this->logger_->log(spdlog::source_loc{loc.file, loc.line, loc.func}, level, std::move(msg));
}

inline uint64_t GetCurrentTimeMs()
{
auto now = std::chrono::steady_clock::now();
auto ms = std::chrono::time_point_cast<std::chrono::milliseconds>(now);
return ms.time_since_epoch().count();
}

bool Logger::FilterCallSite(const char* file, int line)
{
uint64_t now = GetCurrentTimeMs();
const std::string_view fv(file);
std::hash<std::string_view> h;
size_t x = h(fv);
x ^= static_cast<size_t>(line) + kHashMixMagic + (x << kHashShiftLeft) + (x >> kHashShiftRight);
const uint64_t full_hash = static_cast<uint64_t>(x);
const size_t slot_idx = static_cast<size_t>(full_hash % HASH_SLOT_NUM);
// key_tag=0 is reserved for empty; so shift by +1.
const uint64_t key_tag = full_hash + 1u;

auto& slot = hash_slots_[slot_idx];
std::atomic<uint64_t>* rate_state = nullptr;

// 1) Lookup: find an existing chain entry with the same key.
for (size_t i = 0; i < HASH_CHAIN_LEN; ++i) {
uint64_t stored = slot.chain_entries[i].key_hash.load(std::memory_order_relaxed);
if (stored == key_tag) {
rate_state = &slot.chain_entries[i].rate_limit_state;
break;
}
}

// 2) Insert: if key not found, try to claim an empty entry.
if (rate_state == nullptr) {
for (size_t i = 0; i < HASH_CHAIN_LEN; ++i) {
uint64_t expected_empty = 0;
if (slot.chain_entries[i].key_hash.compare_exchange_strong(expected_empty, key_tag,
std::memory_order_relaxed,
std::memory_order_relaxed)) {
rate_state = &slot.chain_entries[i].rate_limit_state;
break;
}
}
}

// 3) Evict: if the chain is full, overwrite a deterministic entry.
if (rate_state == nullptr) {
const size_t evict_idx = static_cast<size_t>(key_tag % HASH_CHAIN_LEN);
rate_state = &slot.chain_entries[evict_idx].rate_limit_state;
slot.chain_entries[evict_idx].key_hash.store(key_tag, std::memory_order_relaxed);
slot.chain_entries[evict_idx].rate_limit_state.store(0, std::memory_order_relaxed);
}

uint64_t s = rate_state->load(std::memory_order_relaxed);
const uint64_t window_start = s >> kRateLimitCountBits;
const uint32_t count = static_cast<uint32_t>(s & kRateLimitCountMask);

if (s == 0 || now - window_start > LIMIT_THRESHOLD_MS) {
const uint64_t desired = (now << kRateLimitCountBits) | 1u;
if (rate_state->compare_exchange_strong(s, desired, std::memory_order_relaxed,
std::memory_order_relaxed)) {
return true;
}
return false;
}

if (count >= RATE_LIMIT_MAX_LOGS_PER_WINDOW) { return false; }
const uint64_t desired =
(window_start << kRateLimitCountBits) | static_cast<uint64_t>(count + 1u);
if (rate_state->compare_exchange_strong(s, desired, std::memory_order_relaxed,
std::memory_order_relaxed)) {
return true;
}
return false;
}

std::shared_ptr<spdlog::logger> Logger::Make()
{
if (this->logger_) { return this->logger_; }
Expand Down
19 changes: 19 additions & 0 deletions ucm/shared/infra/logger/cc/spdlog_logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,17 @@
* */
#ifndef UNIFIEDCACHE_INFRA_LOGGER_SPDLOG_LOGGER_H
#define UNIFIEDCACHE_INFRA_LOGGER_SPDLOG_LOGGER_H
#include <array>
#include <atomic>
#include <chrono>
#include <csignal>
#include <cstdlib>
#include <mutex>
#include <spdlog/spdlog.h>
namespace UC::Logger {

constexpr size_t HASH_SLOT_NUM = 512;
constexpr size_t HASH_CHAIN_LEN = 4;
enum class Level { DEBUG, INFO, WARN, ERROR, CRITICAL };
struct SourceLocation {
const char* file = "";
Expand Down Expand Up @@ -68,7 +74,20 @@ class Logger {

bool IsEnabledFor(Level lv);

bool FilterCallSite(const char* file, int line);

private:
struct ChainEntryData {
std::atomic<uint64_t> key_hash{0};
std::atomic<uint64_t> rate_limit_state{0};
};

struct SlotData {
std::array<ChainEntryData, HASH_CHAIN_LEN> chain_entries;
};

std::array<SlotData, HASH_SLOT_NUM> hash_slots_;

std::shared_ptr<spdlog::logger> Make();
std::string path_{"log"};
int max_files_{3};
Expand Down
6 changes: 6 additions & 0 deletions ucm/shared/infra/logger/cpy/spdlog_logger.py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,17 @@ void LogWrapper(Level lv, std::string file, std::string func, int line, std::str
Log(std::move(lv), std::move(file), std::move(func), line, std::move(msg));
}

void RateLimitLogWrapper(Level lv, std::string file, std::string func, int line, std::string msg)
{
LogRateLimit(std::move(lv), std::move(file), std::move(func), line, std::move(msg));
}

PYBIND11_MODULE(ucmlogger, m)
{
m.def("setup", &Setup);
m.def("flush", &Flush);
m.def("log", &LogWrapper);
m.def("log_rate_limit", &RateLimitLogWrapper);
m.def("isEnabledFor", &isEnabledFor);
py::enum_<Level>(m, "Level")
.value("DEBUG", Level::DEBUG)
Expand Down
8 changes: 8 additions & 0 deletions ucm/shared/infra/logger/logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ void Log(Level lv, std::string file, std::string func, int line, std::string msg
std::move(msg));
}

void LogRateLimit(Level lv, std::string file, std::string func, int line, std::string msg)
{
if (Logger::GetInstance().FilterCallSite(file.c_str(), line)) {
Logger::GetInstance().Log(std::move(lv), SourceLocation{file.c_str(), func.c_str(), line},
std::move(msg));
}
}

void Setup(const std::string& path, int max_files, int max_size)
{
Logger::GetInstance().Setup(path, max_files, max_size);
Expand Down
20 changes: 18 additions & 2 deletions ucm/shared/infra/logger/logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,39 @@
namespace UC::Logger {

void Log(Level lv, std::string file, std::string func, int line, std::string msg);
void LogRateLimit(Level lv, std::string file, std::string func, int line, std::string msg);

template <typename... Args>
void Log(Level lv, const SourceLocation& loc, fmt::format_string<Args...> fmt, Args&&... args)
{
std::string msg = fmt::format(fmt, std::forward<Args>(args)...);
Log(lv, std::string(loc.file), std::string(loc.func), loc.line, std::move(msg));
}

template <typename... Args>
void LogRateLimit(Level lv, const SourceLocation& loc, fmt::format_string<Args...> fmt,
Args&&... args)
{
std::string msg = fmt::format(fmt, std::forward<Args>(args)...);
LogRateLimit(lv, std::string(loc.file), std::string(loc.func), loc.line, std::move(msg));
}

void Setup(const std::string& path, int max_files, int max_size);
void Flush();
bool isEnabledFor(Level lv);

} // namespace UC::Logger
#define UC_SOURCE_LOCATION {__FILE__, __FUNCTION__, __LINE__}
#define UC_LOG(lv, fmt, ...) UC::Logger::Log(lv, UC_SOURCE_LOCATION, FMT_STRING(fmt), ##__VA_ARGS__)
#define UC_LOG_UNLIMITED(lv, fmt, ...) \
UC::Logger::Log(lv, UC_SOURCE_LOCATION, FMT_STRING(fmt), ##__VA_ARGS__)
#define UC_LOG(lv, fmt, ...) \
UC::Logger::LogRateLimit(lv, UC_SOURCE_LOCATION, FMT_STRING(fmt), ##__VA_ARGS__)
#define UC_DEBUG_UNLIMITED(fmt, ...) UC_LOG_UNLIMITED(UC::Logger::Level::DEBUG, fmt, ##__VA_ARGS__)
#define UC_INFO_UNLIMITED(fmt, ...) UC_LOG_UNLIMITED(UC::Logger::Level::INFO, fmt, ##__VA_ARGS__)
#define UC_WARN_UNLIMITED(fmt, ...) UC_LOG_UNLIMITED(UC::Logger::Level::WARN, fmt, ##__VA_ARGS__)
#define UC_ERROR_UNLIMITED(fmt, ...) UC_LOG_UNLIMITED(UC::Logger::Level::ERROR, fmt, ##__VA_ARGS__)
#define UC_DEBUG(fmt, ...) UC_LOG(UC::Logger::Level::DEBUG, fmt, ##__VA_ARGS__)
#define UC_INFO(fmt, ...) UC_LOG(UC::Logger::Level::INFO, fmt, ##__VA_ARGS__)
#define UC_WARN(fmt, ...) UC_LOG(UC::Logger::Level::WARN, fmt, ##__VA_ARGS__)
#define UC_ERROR(fmt, ...) UC_LOG(UC::Logger::Level::ERROR, fmt, ##__VA_ARGS__)

#endif
Loading
Loading