diff --git a/ucm/integration/vllm/ucm_connector.py b/ucm/integration/vllm/ucm_connector.py index 0c68c616b..3eefe00ef 100644 --- a/ucm/integration/vllm/ucm_connector.py +++ b/ucm/integration/vllm/ucm_connector.py @@ -383,7 +383,7 @@ def get_num_new_matched_tokens( except RuntimeError as e: external_hit_blocks = 0 logger.error(f"request {request.request_id} look up error. {e}") - logger.info( + logger.info_once( f"request_id: {request.request_id}, " f"total_blocks_num: {len(ucm_block_ids)}, " f"hit hbm: {hbm_hit_block_num}, " diff --git a/ucm/logger.py b/ucm/logger.py index 18cac76b0..64b09e711 100644 --- a/ucm/logger.py +++ b/ucm/logger.py @@ -105,7 +105,7 @@ def format_log_msg(msg, *args) -> str: return msg % args return msg - def log(self, levelno, message, *args, exc_info=None, scope=None): + def log(self, levelno, message, *args, exc_info=None, scope=None, rate_limit=False): level = LevelMap[levelno] frame = inspect.currentframe() caller_frame = frame.f_back.f_back @@ -116,6 +116,9 @@ def log(self, levelno, message, *args, exc_info=None, scope=None): if exc_info: exc_text = self.format_exception(exc_info) msg = msg + "\n" + exc_text + if rate_limit: + ucmlogger.log_rate_limit(level, file, func, line, msg) + return ucmlogger.log(level, file, func, line, msg) @staticmethod @@ -148,6 +151,15 @@ def debug_once(self, message: str, *args: Hashable, **kwargs: Hashable): def exception(self, message: str, *args: Hashable, **kwargs: Hashable): self.log(logging.ERROR, message, *args, **kwargs, exc_info=True) + def info_limit(self, message: str, *args, **kwargs): + self.log(logging.INFO, message, *args, **kwargs, rate_limit=True) + + def warning_limit(self, message: str, *args, **kwargs): + self.log(logging.WARNING, message, *args, **kwargs, rate_limit=True) + + def debug_limit(self, message: str, *args, **kwargs): + self.log(logging.DEBUG, message, *args, **kwargs, rate_limit=True) + def init_logger(name: str = "UC") -> Logger: return Logger(name) diff --git a/ucm/shared/infra/logger/cc/spdlog_logger.cc b/ucm/shared/infra/logger/cc/spdlog_logger.cc index 5845fc6e4..62560facb 100644 --- a/ucm/shared/infra/logger/cc/spdlog_logger.cc +++ b/ucm/shared/infra/logger/cc/spdlog_logger.cc @@ -31,9 +31,14 @@ #include #include "compress_rotate_file_sink.h" #include "logger.h" - namespace UC::Logger { - +constexpr uint64_t LIMIT_THRESHOLD_MS = 60000; +constexpr uint32_t RATE_LIMIT_MAX_LOGS_PER_WINDOW = 3; +constexpr uint32_t kRateLimitCountBits = 2; +constexpr uint64_t kRateLimitCountMask = (1u << kRateLimitCountBits) - 1u; +constexpr size_t kHashMixMagic = 0x9e3779b97f4a7c15ULL; +constexpr size_t kHashShiftLeft = 12; +constexpr size_t kHashShiftRight = 4; static spdlog::level::level_enum SpdLevels[] = {spdlog::level::debug, spdlog::level::info, spdlog::level::warn, spdlog::level::err, spdlog::level::critical}; @@ -45,6 +50,81 @@ void Logger::Log(Level&& lv, SourceLocation&& loc, std::string&& msg) this->logger_->log(spdlog::source_loc{loc.file, loc.line, loc.func}, level, std::move(msg)); } +inline uint64_t GetCurrentTimeMs() +{ + auto now = std::chrono::steady_clock::now(); + auto ms = std::chrono::time_point_cast(now); + return ms.time_since_epoch().count(); +} + +bool Logger::FilterCallSite(const char* file, int line) +{ + uint64_t now = GetCurrentTimeMs(); + const std::string_view fv(file); + std::hash h; + size_t x = h(fv); + x ^= static_cast(line) + kHashMixMagic + (x << kHashShiftLeft) + (x >> kHashShiftRight); + const uint64_t full_hash = static_cast(x); + const size_t slot_idx = static_cast(full_hash % HASH_SLOT_NUM); + // key_tag=0 is reserved for empty; so shift by +1. + const uint64_t key_tag = full_hash + 1u; + + auto& slot = hash_slots_[slot_idx]; + std::atomic* rate_state = nullptr; + + // 1) Lookup: find an existing chain entry with the same key. + for (size_t i = 0; i < HASH_CHAIN_LEN; ++i) { + uint64_t stored = slot.chain_entries[i].key_hash.load(std::memory_order_relaxed); + if (stored == key_tag) { + rate_state = &slot.chain_entries[i].rate_limit_state; + break; + } + } + + // 2) Insert: if key not found, try to claim an empty entry. + if (rate_state == nullptr) { + for (size_t i = 0; i < HASH_CHAIN_LEN; ++i) { + uint64_t expected_empty = 0; + if (slot.chain_entries[i].key_hash.compare_exchange_strong(expected_empty, key_tag, + std::memory_order_relaxed, + std::memory_order_relaxed)) { + rate_state = &slot.chain_entries[i].rate_limit_state; + break; + } + } + } + + // 3) Evict: if the chain is full, overwrite a deterministic entry. + if (rate_state == nullptr) { + const size_t evict_idx = static_cast(key_tag % HASH_CHAIN_LEN); + rate_state = &slot.chain_entries[evict_idx].rate_limit_state; + slot.chain_entries[evict_idx].key_hash.store(key_tag, std::memory_order_relaxed); + slot.chain_entries[evict_idx].rate_limit_state.store(0, std::memory_order_relaxed); + } + + uint64_t s = rate_state->load(std::memory_order_relaxed); + const uint64_t window_start = s >> kRateLimitCountBits; + const uint32_t count = static_cast(s & kRateLimitCountMask); + + if (s == 0 || now - window_start > LIMIT_THRESHOLD_MS) { + const uint64_t desired = (now << kRateLimitCountBits) | 1u; + if (rate_state->compare_exchange_strong(s, desired, std::memory_order_relaxed, + std::memory_order_relaxed)) { + return true; + } + return false; + } + + if (count >= RATE_LIMIT_MAX_LOGS_PER_WINDOW) { return false; } + const uint64_t desired = + (window_start << kRateLimitCountBits) | static_cast(count + 1u); + if (rate_state->compare_exchange_strong(s, desired, std::memory_order_relaxed, + std::memory_order_relaxed)) { + return true; + } + return false; +} + std::shared_ptr Logger::Make() { if (this->logger_) { return this->logger_; } diff --git a/ucm/shared/infra/logger/cc/spdlog_logger.h b/ucm/shared/infra/logger/cc/spdlog_logger.h index 53743ab0e..7da7ffe06 100644 --- a/ucm/shared/infra/logger/cc/spdlog_logger.h +++ b/ucm/shared/infra/logger/cc/spdlog_logger.h @@ -23,11 +23,17 @@ * */ #ifndef UNIFIEDCACHE_INFRA_LOGGER_SPDLOG_LOGGER_H #define UNIFIEDCACHE_INFRA_LOGGER_SPDLOG_LOGGER_H +#include +#include +#include #include #include +#include #include namespace UC::Logger { +constexpr size_t HASH_SLOT_NUM = 512; +constexpr size_t HASH_CHAIN_LEN = 4; enum class Level { DEBUG, INFO, WARN, ERROR, CRITICAL }; struct SourceLocation { const char* file = ""; @@ -68,7 +74,20 @@ class Logger { bool IsEnabledFor(Level lv); + bool FilterCallSite(const char* file, int line); + private: + struct ChainEntryData { + std::atomic key_hash{0}; + std::atomic rate_limit_state{0}; + }; + + struct SlotData { + std::array chain_entries; + }; + + std::array hash_slots_; + std::shared_ptr Make(); std::string path_{"log"}; int max_files_{3}; diff --git a/ucm/shared/infra/logger/cpy/spdlog_logger.py.cc b/ucm/shared/infra/logger/cpy/spdlog_logger.py.cc index f748063a9..568397f8d 100644 --- a/ucm/shared/infra/logger/cpy/spdlog_logger.py.cc +++ b/ucm/shared/infra/logger/cpy/spdlog_logger.py.cc @@ -32,11 +32,17 @@ void LogWrapper(Level lv, std::string file, std::string func, int line, std::str Log(std::move(lv), std::move(file), std::move(func), line, std::move(msg)); } +void RateLimitLogWrapper(Level lv, std::string file, std::string func, int line, std::string msg) +{ + LogRateLimit(std::move(lv), std::move(file), std::move(func), line, std::move(msg)); +} + PYBIND11_MODULE(ucmlogger, m) { m.def("setup", &Setup); m.def("flush", &Flush); m.def("log", &LogWrapper); + m.def("log_rate_limit", &RateLimitLogWrapper); m.def("isEnabledFor", &isEnabledFor); py::enum_(m, "Level") .value("DEBUG", Level::DEBUG) diff --git a/ucm/shared/infra/logger/logger.cc b/ucm/shared/infra/logger/logger.cc index 6dd7b636b..c96ff531a 100644 --- a/ucm/shared/infra/logger/logger.cc +++ b/ucm/shared/infra/logger/logger.cc @@ -33,6 +33,14 @@ void Log(Level lv, std::string file, std::string func, int line, std::string msg std::move(msg)); } +void LogRateLimit(Level lv, std::string file, std::string func, int line, std::string msg) +{ + if (Logger::GetInstance().FilterCallSite(file.c_str(), line)) { + Logger::GetInstance().Log(std::move(lv), SourceLocation{file.c_str(), func.c_str(), line}, + std::move(msg)); + } +} + void Setup(const std::string& path, int max_files, int max_size) { Logger::GetInstance().Setup(path, max_files, max_size); diff --git a/ucm/shared/infra/logger/logger.h b/ucm/shared/infra/logger/logger.h index 61244f0d2..81e6e586e 100644 --- a/ucm/shared/infra/logger/logger.h +++ b/ucm/shared/infra/logger/logger.h @@ -32,6 +32,7 @@ namespace UC::Logger { void Log(Level lv, std::string file, std::string func, int line, std::string msg); +void LogRateLimit(Level lv, std::string file, std::string func, int line, std::string msg); template void Log(Level lv, const SourceLocation& loc, fmt::format_string fmt, Args&&... args) @@ -39,16 +40,31 @@ void Log(Level lv, const SourceLocation& loc, fmt::format_string fmt, A std::string msg = fmt::format(fmt, std::forward(args)...); Log(lv, std::string(loc.file), std::string(loc.func), loc.line, std::move(msg)); } + +template +void LogRateLimit(Level lv, const SourceLocation& loc, fmt::format_string fmt, + Args&&... args) +{ + std::string msg = fmt::format(fmt, std::forward(args)...); + LogRateLimit(lv, std::string(loc.file), std::string(loc.func), loc.line, std::move(msg)); +} + void Setup(const std::string& path, int max_files, int max_size); void Flush(); bool isEnabledFor(Level lv); } // namespace UC::Logger #define UC_SOURCE_LOCATION {__FILE__, __FUNCTION__, __LINE__} -#define UC_LOG(lv, fmt, ...) UC::Logger::Log(lv, UC_SOURCE_LOCATION, FMT_STRING(fmt), ##__VA_ARGS__) +#define UC_LOG_UNLIMITED(lv, fmt, ...) \ + UC::Logger::Log(lv, UC_SOURCE_LOCATION, FMT_STRING(fmt), ##__VA_ARGS__) +#define UC_LOG(lv, fmt, ...) \ + UC::Logger::LogRateLimit(lv, UC_SOURCE_LOCATION, FMT_STRING(fmt), ##__VA_ARGS__) +#define UC_DEBUG_UNLIMITED(fmt, ...) UC_LOG_UNLIMITED(UC::Logger::Level::DEBUG, fmt, ##__VA_ARGS__) +#define UC_INFO_UNLIMITED(fmt, ...) UC_LOG_UNLIMITED(UC::Logger::Level::INFO, fmt, ##__VA_ARGS__) +#define UC_WARN_UNLIMITED(fmt, ...) UC_LOG_UNLIMITED(UC::Logger::Level::WARN, fmt, ##__VA_ARGS__) +#define UC_ERROR_UNLIMITED(fmt, ...) UC_LOG_UNLIMITED(UC::Logger::Level::ERROR, fmt, ##__VA_ARGS__) #define UC_DEBUG(fmt, ...) UC_LOG(UC::Logger::Level::DEBUG, fmt, ##__VA_ARGS__) #define UC_INFO(fmt, ...) UC_LOG(UC::Logger::Level::INFO, fmt, ##__VA_ARGS__) #define UC_WARN(fmt, ...) UC_LOG(UC::Logger::Level::WARN, fmt, ##__VA_ARGS__) #define UC_ERROR(fmt, ...) UC_LOG(UC::Logger::Level::ERROR, fmt, ##__VA_ARGS__) - #endif diff --git a/ucm/shared/test/case/infra/logger/logger_perf_test.cc b/ucm/shared/test/case/infra/logger/logger_perf_test.cc new file mode 100644 index 000000000..ba0538ac3 --- /dev/null +++ b/ucm/shared/test/case/infra/logger/logger_perf_test.cc @@ -0,0 +1,223 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "logger/logger.h" + +using namespace UC::Logger; + +namespace { +void CleanDir(const std::string& path) +{ + std::error_code ec; + std::filesystem::remove_all(path, ec); + if (ec) { + std::cerr << "Failed to remove file: " << path << std::endl; + std::cerr << "Error: " << ec.message() << std::endl; + std::exit(1); + } +} +} // namespace + +class UCLoggerPerfTest : public ::testing::Test { +protected: + static void SetUpTestSuite() + { + CleanDir(test_log_dir_); + std::filesystem::create_directories(test_log_dir_); + std::cout << "test_log_path_: " << test_log_path_ << std::endl; + logger_ = &Logger::GetInstance(); + logger_->Setup(test_log_dir_, 3, 1); // 3 files, 1MB max size + } + + static void TearDownTestSuite() + { + CleanDir(test_log_dir_); + spdlog::drop_all(); + } + + static inline std::string test_log_dir_ = "log_perf_test"; + static inline std::string test_log_path_ = "log_perf_test/test_log.log"; + static inline Logger* logger_ = nullptr; +}; +namespace { +static inline void PerfLogInfo() { UC_INFO_UNLIMITED("uc_logger_perf_same_site"); } + +static inline void PerfLogInfoLimit() { UC_INFO("uc_logger_perf_same_site"); } + +static inline void PerfLogInfoRandom(const std::string& content) +{ + Log(Level::INFO, "logger_perf_random.cc", "PerfLogInfoRandom", 100, std::move(content)); +} +static inline void PerfLogInfoLimitRandom(const std::string& content) +{ + LogRateLimit(Level::INFO, "logger_perf_random.cc", "PerfLogInfoLimitRandom", 100, + std::string(content)); +} + +template +static double BenchmarkMultiThreadNsPerCall(int threads, int iterations, Fn&& fn) +{ + std::vector workers; + workers.reserve(threads); + + std::atomic ready{0}; + std::atomic go{false}; + + for (int tid = 0; tid < threads; ++tid) { + workers.emplace_back([&, tid] { + ready.fetch_add(1, std::memory_order_acq_rel); + while (!go.load(std::memory_order_acquire)) {} + for (int i = 0; i < iterations; ++i) { fn(tid, i); } + }); + } + + while (ready.load(std::memory_order_acquire) != threads) {} + + const auto begin = std::chrono::steady_clock::now(); + go.store(true, std::memory_order_release); + for (auto& t : workers) { t.join(); } + const auto end = std::chrono::steady_clock::now(); + + const auto total_ns = std::chrono::duration_cast(end - begin).count(); + const double total_calls = static_cast(threads) * static_cast(iterations); + return static_cast(total_ns) / total_calls; +} + +} // namespace + +TEST_F(UCLoggerPerfTest, MultiThreadPerfUCInfoVsRateLimit) +{ + auto spdlog_logger = spdlog::get("UC"); + ASSERT_NE(spdlog_logger, nullptr); + + // Keep benchmark focused on `UC_INFO` vs `UC_INFO_LIMIT` overhead. + // Message formatting still happens in the UC_* macros, but spdlog sinks should do nothing. + const auto old_level = spdlog_logger->level(); + spdlog_logger->set_level(spdlog::level::off); + + const unsigned hc = std::thread::hardware_concurrency(); + const int threads = static_cast(std::min(8, std::max(2, hc ? hc : 4))); + const int iterations_per_thread = 2000; + + // Warmup to stabilize instruction-cache and initial rate-limit cache path. + for (int i = 0; i < 32; ++i) { + PerfLogInfo(); + PerfLogInfoLimit(); + } + + const double ns_per_call_info_limit = BenchmarkMultiThreadNsPerCall( + threads, iterations_per_thread, [](int /*tid*/, int /*i*/) { PerfLogInfoLimit(); }); + const double ns_per_call_info = BenchmarkMultiThreadNsPerCall( + threads, iterations_per_thread, [](int /*tid*/, int /*i*/) { PerfLogInfo(); }); + + spdlog_logger->set_level(old_level); + + const double ratio = ns_per_call_info_limit / ns_per_call_info; + RecordProperty("uc_info_avg_ns_per_call", std::to_string(ns_per_call_info)); + RecordProperty("uc_info_limit_avg_ns_per_call", std::to_string(ns_per_call_info_limit)); + RecordProperty("uc_info_limit_over_uc_info_ratio", std::to_string(ratio)); + + std::cout << "[UCLoggerPerf] threads=" << threads + << " iterations_per_thread=" << iterations_per_thread + << " uc_info(ns/call)=" << ns_per_call_info + << " uc_info_limit(ns/call)=" << ns_per_call_info_limit << " ratio=" << ratio + << std::endl; + + // The test is informational: it reports the ratio so you can confirm + // whether `UC_INFO_LIMIT` is worse under concurrent call patterns. + ASSERT_TRUE(ratio <= 1.1); +} + +TEST_F(UCLoggerPerfTest, MultiThreadPerfUCInfoVsRateLimitRandomContent) +{ + auto spdlog_logger = spdlog::get("UC"); + ASSERT_NE(spdlog_logger, nullptr); + + const auto old_level = spdlog_logger->level(); + spdlog_logger->set_level(spdlog::level::off); + + const unsigned hc = std::thread::hardware_concurrency(); + const int threads = static_cast(std::min(8, std::max(2, hc ? hc : 4))); + const int iterations_per_thread = 2000; + + // Pre-generate totally random payloads per thread/call so that the final + // log message content is different for nearly every invocation. + std::vector> payloads(threads, + std::vector(iterations_per_thread)); + std::mt19937_64 rng(123456789ULL); + std::uniform_int_distribution len_dist(16, 64); + std::uniform_int_distribution ch_dist(0, 61); // [0-9A-Za-z] + + auto gen_char = [&](int v) -> char { + if (v < 10) { return static_cast('0' + v); } + v -= 10; + if (v < 26) { return static_cast('A' + v); } + v -= 26; + return static_cast('a' + v); + }; + + for (int t = 0; t < threads; ++t) { + for (int i = 0; i < iterations_per_thread; ++i) { + const int len = len_dist(rng); + std::string s; + s.reserve(static_cast(len)); + for (int k = 0; k < len; ++k) { s.push_back(gen_char(ch_dist(rng))); } + payloads[t][i] = std::move(s); + } + } + + // Warmup with random content as well, cycling through the 200 templates. + for (int i = 0; i < 32; ++i) { + PerfLogInfoRandom(payloads[0][i]); + PerfLogInfoLimitRandom(payloads[0][i]); + } + + const double ns_per_call_info_limit = BenchmarkMultiThreadNsPerCall( + threads, iterations_per_thread, + [&](int tid, int i) { PerfLogInfoLimitRandom(payloads[tid][i]); }); + const double ns_per_call_info = + BenchmarkMultiThreadNsPerCall(threads, iterations_per_thread, + [&](int tid, int i) { PerfLogInfoRandom(payloads[tid][i]); }); + + spdlog_logger->set_level(old_level); + + const double ratio = ns_per_call_info_limit / ns_per_call_info; + RecordProperty("uc_info_random_avg_ns_per_call", std::to_string(ns_per_call_info)); + RecordProperty("uc_info_limit_random_avg_ns_per_call", std::to_string(ns_per_call_info_limit)); + RecordProperty("uc_info_limit_random_over_uc_info_random_ratio", std::to_string(ratio)); + std::cout << "[UCLoggerPerf] threads=" << threads + << " iterations_per_thread=" << iterations_per_thread + << " uc_info_random_content(ns/call)=" << ns_per_call_info + << " uc_info_limit_random_content(ns/call)=" << ns_per_call_info_limit + << " ratio=" << ratio << std::endl; + ASSERT_TRUE(ratio <= 1.1); +}