diff --git a/src/v/io/BUILD b/src/v/io/BUILD index ad58ee0ba1dcb..33463300263cd 100644 --- a/src/v/io/BUILD +++ b/src/v/io/BUILD @@ -15,7 +15,6 @@ redpanda_cc_library( "scheduler.cc", ], hdrs = [ - "cache.h", "interval_map.h", "io_queue.h", "page.h", @@ -33,6 +32,7 @@ redpanda_cc_library( "//src/v/container:intrusive", "//src/v/ssx:future_util", "//src/v/ssx:semaphore", + "//src/v/utils:s3_fifo", "@abseil-cpp//absl/container:btree", "@abseil-cpp//absl/container:flat_hash_map", "@boost//:intrusive", diff --git a/src/v/io/page.cc b/src/v/io/page.cc index a1e2439b98043..8439c25a60c94 100644 --- a/src/v/io/page.cc +++ b/src/v/io/page.cc @@ -22,7 +22,7 @@ page::page(uint64_t offset, seastar::temporary_buffer data) page::page( uint64_t offset, seastar::temporary_buffer data, - const class cache_hook& hook) + const class utils::s3_fifo::cache_hook& hook) : cache_hook(hook) , offset_(offset) , size_(data.size()) diff --git a/src/v/io/page.h b/src/v/io/page.h index 4a711e6ba5420..8c811311b8c39 100644 --- a/src/v/io/page.h +++ b/src/v/io/page.h @@ -11,7 +11,7 @@ #pragma once #include "container/intrusive_list_helpers.h" -#include "io/cache.h" +#include "utils/s3_fifo.h" #include #include @@ -40,7 +40,7 @@ class page : public seastar::enable_lw_shared_from_this { page( uint64_t offset, seastar::temporary_buffer data, - const cache_hook& hook); + const utils::s3_fifo::cache_hook& hook); page(const page&) = delete; page& operator=(const page&) = delete; @@ -123,7 +123,7 @@ class page : public seastar::enable_lw_shared_from_this { * Page cache entry intrusive list hook. */ // NOLINTNEXTLINE(*-non-private-member-variables-in-classes) - cache_hook cache_hook; + utils::s3_fifo::cache_hook cache_hook; struct waiter { intrusive_list_hook waiter; diff --git a/src/v/io/page_cache.h b/src/v/io/page_cache.h index dee1d7bdb96d5..e4792f6f40df6 100644 --- a/src/v/io/page_cache.h +++ b/src/v/io/page_cache.h @@ -9,8 +9,8 @@ * by the Apache License, Version 2.0 */ #pragma once -#include "io/cache.h" #include "io/page.h" +#include "utils/s3_fifo.h" namespace experimental::io { @@ -26,7 +26,8 @@ class page_cache { size_t operator()(const page&) noexcept; }; - using cache_type = cache; + using cache_type + = utils::s3_fifo::cache; public: using config = cache_type::config; diff --git a/src/v/io/pager.cc b/src/v/io/pager.cc index eaab1a6a55a77..8eb8ec5329539 100644 --- a/src/v/io/pager.cc +++ b/src/v/io/pager.cc @@ -45,8 +45,8 @@ seastar::future<> pager::close() noexcept { } } -seastar::lw_shared_ptr -pager::alloc_page(uint64_t offset, std::optional hook) noexcept { +seastar::lw_shared_ptr pager::alloc_page( + uint64_t offset, std::optional hook) noexcept { auto buf = seastar::temporary_buffer::aligned(page_size, page_size); if (hook.has_value()) { return seastar::make_lw_shared( diff --git a/src/v/io/pager.h b/src/v/io/pager.h index f26b5521992a2..10e5aa0879ee7 100644 --- a/src/v/io/pager.h +++ b/src/v/io/pager.h @@ -10,9 +10,9 @@ */ #pragma once -#include "io/cache.h" #include "io/page_set.h" #include "io/scheduler.h" +#include "utils/s3_fifo.h" #include #include @@ -92,7 +92,8 @@ class pager { private: static seastar::lw_shared_ptr alloc_page( - uint64_t offset, std::optional hook = std::nullopt) noexcept; + uint64_t offset, + std::optional hook = std::nullopt) noexcept; /* * Read a page from the underlying file. diff --git a/src/v/io/tests/BUILD b/src/v/io/tests/BUILD index 1cf407432fbbb..851e19422dc4d 100644 --- a/src/v/io/tests/BUILD +++ b/src/v/io/tests/BUILD @@ -29,16 +29,6 @@ redpanda_cc_gtest( ], ) -redpanda_cc_gtest( - name = "cache_test", - timeout = "short", - srcs = ["cache_test.cc"], - deps = [ - "//src/v/io", - "//src/v/test_utils:gtest", - ], -) - redpanda_cc_gtest( name = "interval_map_test", timeout = "short", diff --git a/src/v/io/tests/CMakeLists.txt b/src/v/io/tests/CMakeLists.txt index 3c219aa7ed429..83a491f42f279 100644 --- a/src/v/io/tests/CMakeLists.txt +++ b/src/v/io/tests/CMakeLists.txt @@ -6,7 +6,6 @@ rp_test( SOURCES common.cc common_test.cc - cache_test.cc interval_map_test.cc persistence_test.cc page_test.cc diff --git a/src/v/utils/BUILD b/src/v/utils/BUILD index 8d8901611e742..39549b8449103 100644 --- a/src/v/utils/BUILD +++ b/src/v/utils/BUILD @@ -652,3 +652,28 @@ redpanda_cc_library( "@seastar", ], ) + +redpanda_cc_library( + name = "s3_fifo", + hdrs = [ + "s3_fifo.h", + ], + include_prefix = "utils", + deps = [ + "@boost//:intrusive", + "@fmt", + ], +) + +redpanda_cc_library( + name = "chunked_kv_cache", + hdrs = [ + "chunked_kv_cache.h", + ], + include_prefix = "utils", + deps = [ + "//src/v/container:chunked_hash_map", + "//src/v/utils:s3_fifo", + "@boost//:intrusive", + ], +) diff --git a/src/v/utils/chunked_kv_cache.h b/src/v/utils/chunked_kv_cache.h new file mode 100644 index 0000000000000..aa0a2c8e0c193 --- /dev/null +++ b/src/v/utils/chunked_kv_cache.h @@ -0,0 +1,195 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "container/chunked_hash_map.h" +#include "utils/s3_fifo.h" + +#include +#include + +#include +#include +#include + +namespace utils { + +/** + * A basic key-value cache implementation built on top of the s3_fifo::cache. + */ +template< + typename Key, + typename Value, + typename Hash = std::conditional_t< + detail::has_absl_hash, + detail::avalanching_absl_hash, + ankerl::unordered_dense::hash>, + typename EqualTo = std::equal_to> +class chunked_kv_cache { + struct cached_value; + struct evict; + using cache_t = s3_fifo::cache< + cached_value, + &cached_value::hook, + evict, + s3_fifo::default_cache_cost>; + +public: + using config = cache_t::config; + + explicit chunked_kv_cache(config config) + : _cache{config, evict{*this}} {} + + ~chunked_kv_cache() noexcept = default; + + // These contructors need to be deleted to ensure a stable `this` pointer. + chunked_kv_cache(chunked_kv_cache&&) = delete; + chunked_kv_cache& operator=(chunked_kv_cache&&) noexcept = delete; + chunked_kv_cache(const chunked_kv_cache&) = delete; + chunked_kv_cache& operator=(const chunked_kv_cache&) noexcept = delete; + + /** + * Inserts a value for a given key into the cache. + * + * Returns true if the value was inserted and false if there was already a + * value for the given key in the cache. + */ + bool try_insert(const Key& key, ss::shared_ptr val); + + /** + * Gets the key's corresponding value from the cache. + * + * Returns std::nullopt if the key doesn't have a value in the cache. + */ + ss::optimized_optional> get_value(const Key& key); + + using cache_stat = struct cache_t::stat; + /** + * Cache statistics. + */ + struct stat : public cache_stat { + /// Current size of the cache index. + size_t index_size; + }; + + /** + * Returns the current cache statistics. + */ + [[nodiscard]] stat stat() const noexcept; + +private: + using ghost_hook_t = boost::intrusive::list_member_hook< + boost::intrusive::link_mode>; + + struct cached_value { + Key key; + ss::shared_ptr value; + s3_fifo::cache_hook hook; + ghost_hook_t ghost_hook; + }; + + using entry_t = std::unique_ptr; + using ghost_fifo_t = boost::intrusive::list< + cached_value, + boost::intrusive:: + member_hook>; + + chunked_hash_map _map; + cache_t _cache; + ghost_fifo_t _ghost_fifo; + + void gc_ghost_fifo(); +}; + +template +struct chunked_kv_cache::evict { + chunked_kv_cache& kv_c; + + bool operator()(cached_value& e) noexcept { + e.value = nullptr; + kv_c._ghost_fifo.push_back(e); + return true; + } +}; + +template +bool chunked_kv_cache::try_insert( + const Key& key, ss::shared_ptr val) { + gc_ghost_fifo(); + + auto e_it = _map.find(key); + if (e_it == _map.end()) { + auto [e_it, succ] = _map.try_emplace( + key, std::make_unique(key, std::move(val))); + if (!succ) { + return false; + } + + _cache.insert(*e_it->second); + return true; + } + + auto& entry = *e_it->second; + if (entry.hook.evicted()) { + entry.value = std::move(val); + _ghost_fifo.erase(_ghost_fifo.iterator_to(entry)); + _cache.insert(entry); + return true; + } + + return false; +} + +template +ss::optimized_optional> +chunked_kv_cache::get_value(const Key& key) { + gc_ghost_fifo(); + + auto e_it = _map.find(key); + if (e_it == _map.end()) { + return std::nullopt; + } + + auto& entry = *e_it->second; + if (entry.hook.evicted()) { + return std::nullopt; + } + + entry.hook.touch(); + return entry.value; +} + +template +void chunked_kv_cache::gc_ghost_fifo() { + for (auto it = _ghost_fifo.begin(); it != _ghost_fifo.end();) { + auto& entry = *it; + if (_cache.ghost_queue_contains(entry)) { + // The ghost queue is in fifo-order so any entry that comes after an + // entry that hasn't been evicted will also not be evicted. + return; + } + + it = _ghost_fifo.erase(it); + _map.erase(entry.key); + } +} +template +struct chunked_kv_cache::stat +chunked_kv_cache::stat() const noexcept { + struct stat s { + _cache.stat() + }; + s.index_size = _map.size(); + return s; +} + +} // namespace utils diff --git a/src/v/io/cache.h b/src/v/utils/s3_fifo.h similarity index 98% rename from src/v/io/cache.h rename to src/v/utils/s3_fifo.h index d2f82eea84e1a..c29cf737b751f 100644 --- a/src/v/io/cache.h +++ b/src/v/utils/s3_fifo.h @@ -175,7 +175,7 @@ * @{ */ -namespace experimental::io { +namespace utils::s3_fifo { namespace testing_details { class cache_hook_accessor; @@ -674,18 +674,18 @@ bool cache::evict_main() noexcept { * @} */ -} // namespace experimental::io +} // namespace utils::s3_fifo template< typename T, - experimental::io::cache_hook T::*Hook, - experimental::io::cache_evictor Evictor, - experimental::io::cache_cost Cost> -struct fmt::formatter> + utils::s3_fifo::cache_hook T::*Hook, + utils::s3_fifo::cache_evictor Evictor, + utils::s3_fifo::cache_cost Cost> +struct fmt::formatter> : fmt::formatter { template auto format( - const experimental::io::cache& cache, + const utils::s3_fifo::cache& cache, FormatContext& ctx) const { const auto stat = cache.stat(); return fmt::format_to( diff --git a/src/v/utils/tests/BUILD b/src/v/utils/tests/BUILD index d607bbec47e3a..b8b72f8875223 100644 --- a/src/v/utils/tests/BUILD +++ b/src/v/utils/tests/BUILD @@ -513,3 +513,24 @@ redpanda_cc_gtest( "@seastar", ], ) + +redpanda_cc_gtest( + name = "s3_fifo_test", + timeout = "short", + srcs = ["s3_fifo_test.cc"], + deps = [ + "//src/v/test_utils:gtest", + "//src/v/utils:s3_fifo", + ], +) + +redpanda_cc_gtest( + name = "chunked_kv_cache_test", + timeout = "short", + srcs = ["chunked_kv_cache_test.cc"], + deps = [ + "//src/v/test_utils:gtest", + "//src/v/utils:chunked_kv_cache", + "@seastar", + ], +) diff --git a/src/v/utils/tests/CMakeLists.txt b/src/v/utils/tests/CMakeLists.txt index 3623a6c27be19..af5f850fb5f8b 100644 --- a/src/v/utils/tests/CMakeLists.txt +++ b/src/v/utils/tests/CMakeLists.txt @@ -31,6 +31,8 @@ rp_test( BINARY_NAME gtest_utils_single_thread SOURCES external_process_test.cc + s3_fifo_test.cc + chunked_kv_cache_test.cc LIBRARIES v::utils v::gtest_main diff --git a/src/v/utils/tests/chunked_kv_cache_test.cc b/src/v/utils/tests/chunked_kv_cache_test.cc new file mode 100644 index 0000000000000..cdafdbe702257 --- /dev/null +++ b/src/v/utils/tests/chunked_kv_cache_test.cc @@ -0,0 +1,122 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +#include "test_utils/test.h" +#include "utils/chunked_kv_cache.h" + +#include + +TEST(ChunkedKVTest, InsertGetTest) { + using cache_type = utils::chunked_kv_cache; + + cache_type cache(cache_type::config{.cache_size = 2, .small_size = 1}); + auto str = "avaluestr"; + + EXPECT_EQ(cache.try_insert(1, ss::make_shared(str)), true); + auto v = cache.get_value(1); + EXPECT_TRUE(v); + EXPECT_EQ(**v, str); +} + +TEST(ChunkedKVTest, InvalidGetTest) { + using cache_type = utils::chunked_kv_cache; + + cache_type cache(cache_type::config{.cache_size = 2, .small_size = 1}); + + EXPECT_EQ(cache.get_value(0), std::nullopt); +} + +TEST(ChunkedKVTest, ReinsertionTest) { + using cache_type = utils::chunked_kv_cache; + + cache_type cache(cache_type::config{.cache_size = 2, .small_size = 1}); + auto str = "avaluestr"; + + EXPECT_EQ(cache.try_insert(0, ss::make_shared(str)), true); + auto stat = cache.stat(); + EXPECT_EQ(stat.main_queue_size, 0); + EXPECT_EQ(stat.small_queue_size, 1); + EXPECT_EQ(stat.index_size, 1); + + EXPECT_EQ(cache.try_insert(0, ss::make_shared(str)), false); + stat = cache.stat(); + EXPECT_EQ(stat.main_queue_size, 0); + EXPECT_EQ(stat.small_queue_size, 1); + EXPECT_EQ(stat.index_size, 1); +} + +TEST(ChunkedKVTest, EvictionTest) { + using cache_type = utils::chunked_kv_cache; + + cache_type cache(cache_type::config{.cache_size = 2, .small_size = 1}); + auto str = "avaluestr"; + + // Initial phase without any eviction. The `s3_fifo` cache allows for its + // max size to be exceeded by one. + for (int i = 0; i < 3; i++) { + EXPECT_EQ(cache.try_insert(i, ss::make_shared(str)), true); + auto stat = cache.stat(); + EXPECT_EQ(stat.main_queue_size, 0); + EXPECT_EQ(stat.small_queue_size, i + 1); + EXPECT_EQ(stat.index_size, i + 1); + } + + // Next phase with evictions and one ghost queue entry. Ensures that entries + // that have been removed from the ghost queue are also removed from the + // index. + for (int i = 3; i < 10; i++) { + EXPECT_EQ(cache.try_insert(i, ss::make_shared(str)), true); + auto stat = cache.stat(); + EXPECT_EQ(stat.main_queue_size, 0); + EXPECT_EQ(stat.small_queue_size, 3); + EXPECT_EQ(stat.index_size, 4); + } +} + +TEST(ChunkedKVTest, GhostToMainTest) { + using cache_type = utils::chunked_kv_cache; + + cache_type cache(cache_type::config{.cache_size = 4, .small_size = 1}); + auto str = "avaluestr"; + + // Fill the cache + for (int i = 0; i < 5; i++) { + EXPECT_EQ(cache.try_insert(i, ss::make_shared(str)), true); + auto stat = cache.stat(); + EXPECT_EQ(stat.main_queue_size, 0); + EXPECT_EQ(stat.small_queue_size, i + 1); + EXPECT_EQ(stat.index_size, i + 1); + } + + // Move one entry to the ghost queue + EXPECT_EQ(cache.try_insert(5, ss::make_shared(str)), true); + auto stat = cache.stat(); + EXPECT_EQ(stat.main_queue_size, 0); + EXPECT_EQ(stat.small_queue_size, 5); + EXPECT_EQ(stat.index_size, 6); + + // Get the key for the entry in the ghost queue. + int key = -1; + for (int i = 0; i < 5; i++) { + if (!cache.get_value(i)) { + key = i; + break; + } + } + + EXPECT_NE(key, -1); + + // Move entry from ghost queue to main queue. + EXPECT_EQ(cache.try_insert(key, ss::make_shared(str)), true); + stat = cache.stat(); + EXPECT_EQ(stat.main_queue_size, 1); + EXPECT_EQ(stat.small_queue_size, 4); + EXPECT_EQ(stat.index_size, 6); +} diff --git a/src/v/io/tests/cache_test.cc b/src/v/utils/tests/s3_fifo_test.cc similarity index 95% rename from src/v/io/tests/cache_test.cc rename to src/v/utils/tests/s3_fifo_test.cc index a8b5af31b9ada..3580613a141e8 100644 --- a/src/v/io/tests/cache_test.cc +++ b/src/v/utils/tests/s3_fifo_test.cc @@ -8,29 +8,29 @@ * the Business Source License, use of this software will be governed * by the Apache License, Version 2.0 */ -#include "io/cache.h" #include "test_utils/test.h" +#include "utils/s3_fifo.h" #include -namespace experimental::io::testing_details { +namespace utils::s3_fifo::testing_details { class cache_hook_accessor { public: static std::optional - get_hook_insertion_time(const io::cache_hook& hook) { + get_hook_insertion_time(const utils::s3_fifo::cache_hook& hook) { return hook.ghost_insertion_time_; } - static void - set_hook_insertion_time(io::cache_hook& hook, std::optional time) { + static void set_hook_insertion_time( + utils::s3_fifo::cache_hook& hook, std::optional time) { hook.ghost_insertion_time_ = time; } - static uint8_t get_hook_freq(io::cache_hook& hook) { return hook.freq_; } + static uint8_t get_hook_freq(utils::s3_fifo::cache_hook& hook) { + return hook.freq_; + } }; -} // namespace experimental::io::testing_details - -namespace io = experimental::io; +} // namespace utils::s3_fifo::testing_details class CacheTest : public ::testing::Test { public: @@ -39,7 +39,7 @@ class CacheTest : public ::testing::Test { static constexpr auto main_size = cache_size - small_size; struct entry { - io::cache_hook hook; + utils::s3_fifo::cache_hook hook; bool may_evict{true}; bool evicted{false}; }; @@ -58,7 +58,8 @@ class CacheTest : public ::testing::Test { size_t operator()(const entry& /*entry*/) noexcept { return 1; } }; - using cache_type = io::cache; + using cache_type + = utils::s3_fifo::cache; void SetUp() override { cache = std::make_unique(cache_type::config{ @@ -78,19 +79,19 @@ class CacheTest : public ::testing::Test { } static std::optional get_hook_insertion_time(const entry& entry) { - return io::testing_details::cache_hook_accessor:: + return utils::s3_fifo::testing_details::cache_hook_accessor:: get_hook_insertion_time(entry.hook); } static void set_hook_insertion_time(entry& entry, std::optional time) { - io::testing_details::cache_hook_accessor::set_hook_insertion_time( - entry.hook, time); + utils::s3_fifo::testing_details::cache_hook_accessor:: + set_hook_insertion_time(entry.hook, time); } static uint8_t get_hook_freq(entry& entry) { - return io::testing_details::cache_hook_accessor::get_hook_freq( - entry.hook); + return utils::s3_fifo::testing_details::cache_hook_accessor:: + get_hook_freq(entry.hook); } template @@ -700,7 +701,7 @@ TEST_F(CacheTest, Formattable) { TEST(CacheTestCustom, CustomCost) { struct entry { - io::cache_hook hook; + utils::s3_fifo::cache_hook hook; std::string data; }; @@ -713,8 +714,11 @@ TEST(CacheTestCustom, CustomCost) { constexpr auto cache_size = 100; constexpr auto small_size = 5; - using cache_type - = io::cache; + using cache_type = utils::s3_fifo::cache< + entry, + &entry::hook, + utils::s3_fifo::default_cache_evictor, + entry_cost>; cache_type cache( cache_type::config{.cache_size = cache_size, .small_size = small_size});