diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 206085b208a..dd6be2c9efd 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1776,6 +1776,32 @@ struct ScanOptions { : range(_start, _upper_bound) {} }; +class BlockBasedTable; +class PrefetchRateLimiter { + public: + PrefetchRateLimiter() = default; + virtual ~PrefetchRateLimiter() = default; + + virtual size_t acquire(const BlockBasedTable* table, size_t bytes, + bool all_or_nothing) = 0; + virtual bool release(size_t bytes) = 0; +}; + +class DefaultPrefetchRateLimiter : public PrefetchRateLimiter { + public: + explicit DefaultPrefetchRateLimiter(size_t max_bytes) + : max_bytes_(max_bytes), cur_bytes_(max_bytes) {} + virtual ~DefaultPrefetchRateLimiter() override = default; + + virtual size_t acquire(const BlockBasedTable* table, size_t bytes, + bool all_or_nothing) override; + virtual bool release(size_t bytes) override; + + private: + const size_t max_bytes_; + std::atomic cur_bytes_; +}; + // Container for multiple scan ranges that can be used with MultiScan. // This replaces std::vector with a more efficient implementation // that can merge overlapping ranges. @@ -1783,23 +1809,28 @@ class MultiScanArgs { public: // Constructor that takes a comparator explicit MultiScanArgs(const Comparator* comparator = BytewiseComparator()) - : comp_(comparator) {} + : prefetch_rate_limiter(nullptr), comp_(comparator) {} // Copy Constructor - MultiScanArgs(const MultiScanArgs& other) { - comp_ = other.comp_; - original_ranges_ = other.original_ranges_; - io_coalesce_threshold = other.io_coalesce_threshold; - } + MultiScanArgs(const MultiScanArgs& other) + : io_coalesce_threshold(other.io_coalesce_threshold), + prefetch_rate_limiter(other.prefetch_rate_limiter), + comp_(other.comp_), + original_ranges_(other.original_ranges_) {} + MultiScanArgs(MultiScanArgs&& other) noexcept : io_coalesce_threshold(other.io_coalesce_threshold), + prefetch_rate_limiter(std::move(other.prefetch_rate_limiter)), comp_(other.comp_), original_ranges_(std::move(other.original_ranges_)) {} MultiScanArgs& operator=(const MultiScanArgs& other) { - comp_ = other.comp_; - original_ranges_ = other.original_ranges_; - io_coalesce_threshold = other.io_coalesce_threshold; + if (this != &other) { + comp_ = other.comp_; + original_ranges_ = other.original_ranges_; + io_coalesce_threshold = other.io_coalesce_threshold; + prefetch_rate_limiter = other.prefetch_rate_limiter; + } return *this; } @@ -1808,6 +1839,7 @@ class MultiScanArgs { comp_ = other.comp_; original_ranges_ = std::move(other.original_ranges_); io_coalesce_threshold = other.io_coalesce_threshold; + prefetch_rate_limiter = std::move(other.prefetch_rate_limiter); } return *this; } @@ -1849,6 +1881,12 @@ class MultiScanArgs { uint64_t io_coalesce_threshold = 16 << 10; // 16KB by default + std::shared_ptr prefetch_rate_limiter; + + PrefetchRateLimiter& GetMutablePrefetchRateLimiter() const { + return *prefetch_rate_limiter.get(); + } + private: // The comparator used for ordering ranges const Comparator* comp_; diff --git a/table/block_based/block_based_table_iterator.cc b/table/block_based/block_based_table_iterator.cc index a8d821e2c32..3a1a3a76b55 100644 --- a/table/block_based/block_based_table_iterator.cc +++ b/table/block_based/block_based_table_iterator.cc @@ -10,6 +10,56 @@ namespace ROCKSDB_NAMESPACE { +size_t DefaultPrefetchRateLimiter::acquire(const BlockBasedTable* /*unused*/, + size_t bytes, bool all_or_nothing) { + bool done = false; + size_t amount = 0; + // Quick check if we have nothing. + if (cur_bytes_ == 0) { + return amount; + } + while (!done) { + // Check again here. + size_t current = cur_bytes_.load(); + if (current == 0) { + amount = 0; + return amount; + } + if (all_or_nothing) { + if (current >= bytes) { + done = cur_bytes_.compare_exchange_weak(current, current - bytes); + amount = bytes; + } else { + amount = 0; + return amount; + } + } else { + if (current > bytes) { + done = cur_bytes_.compare_exchange_weak(current, current - bytes); + amount = bytes; + } else { + done = cur_bytes_.compare_exchange_weak(current, 0); + amount = current; + } + } + } + return amount; +} + +bool DefaultPrefetchRateLimiter::release(size_t bytes) { + bool done = false; + while (!done) { + // Check again here. + size_t current = cur_bytes_.load(); + if (current + bytes >= max_bytes_) { + done = cur_bytes_.compare_exchange_weak(current, max_bytes_); + } else { + done = cur_bytes_.compare_exchange_weak(current, current + bytes); + } + } + return true; +} + void BlockBasedTableIterator::SeekToFirst() { SeekImpl(nullptr, false); } void BlockBasedTableIterator::Seek(const Slice& target) { @@ -984,6 +1034,7 @@ void BlockBasedTableIterator::Prepare(const MultiScanArgs* multiscan_opts) { std::vector blocks_to_prepare; Status s; std::vector> block_ranges_per_scan; + total_acquired_ = 0; for (const auto& scan_opt : *scan_opts) { size_t num_blocks = 0; // Current scan overlap the last block of the previous scan. @@ -1000,6 +1051,16 @@ void BlockBasedTableIterator::Prepare(const MultiScanArgs* multiscan_opts) { index_iter_->user_key(), /*a_has_ts*/ true, *scan_opt.range.limit, /*b_has_ts=*/false) <= 0)) { + // Lets make sure we are rate limited on how many blocks to prepare + if (multiscan_opts->prefetch_rate_limiter) { + auto blocks = multiscan_opts->GetMutablePrefetchRateLimiter().acquire( + table_, index_iter_->value().handle.size(), true); + total_acquired_ += blocks; + if (blocks == 0) { + break; + } + } + if (check_overlap && blocks_to_prepare.back() == index_iter_->value().handle) { // Skip the current block since it's already in the list @@ -1162,6 +1223,10 @@ void BlockBasedTableIterator::Prepare(const MultiScanArgs* multiscan_opts) { } } + if (read_reqs.size() == 0) { + return; + } + AlignedBuf aligned_buf; s = table_->get_rep()->file.get()->MultiRead( io_opts, read_reqs.data(), read_reqs.size(), @@ -1345,6 +1410,14 @@ void BlockBasedTableIterator::FindBlockForwardInMultiScan() { } // Move to the next pinned data block ResetDataIter(); + if (multi_scan_->prefetch_rate_limiter) { + size_t releasing = + multi_scan_->pinned_data_blocks[multi_scan_->cur_data_block_idx] + .GetValue() + ->size(); + multi_scan_->prefetch_rate_limiter->release(releasing); + total_acquired_ -= releasing; + } ++multi_scan_->cur_data_block_idx; table_->NewDataBlockIterator( read_options_, diff --git a/table/block_based/block_based_table_iterator.h b/table/block_based/block_based_table_iterator.h index d31296fcf84..32ef25e9170 100644 --- a/table/block_based/block_based_table_iterator.h +++ b/table/block_based/block_based_table_iterator.h @@ -16,6 +16,7 @@ #include "table/block_based/reader_common.h" namespace ROCKSDB_NAMESPACE { + // Iterates over the contents of BlockBasedTable. class BlockBasedTableIterator : public InternalIteratorBase { // compaction_readahead_size: its value will only be used if for_compaction = @@ -47,7 +48,17 @@ class BlockBasedTableIterator : public InternalIteratorBase { is_last_level_(table->IsLastLevel()), block_iter_points_to_real_block_(false) {} - ~BlockBasedTableIterator() override { ClearBlockHandles(); } + ~BlockBasedTableIterator() override { + ClearBlockHandles(); + + // Release any acquired bytes from the rate limiter if we have a multi_scan_ + // Use the stored rate limiter copy to avoid accessing potentially invalid + // scan_opts + if (multi_scan_ && multi_scan_->prefetch_rate_limiter && + total_acquired_ > 0) { + multi_scan_->prefetch_rate_limiter->release(total_acquired_); + } + } void Seek(const Slice& target) override; void SeekForPrev(const Slice& target) override; @@ -373,6 +384,8 @@ class BlockBasedTableIterator : public InternalIteratorBase { // *** END States used by both regular scan and multiscan // *** BEGIN MultiScan related states *** + size_t total_acquired_ = 0; + struct MultiScanState { // bool prepared_ = false; const MultiScanArgs* scan_opts; @@ -385,6 +398,10 @@ class BlockBasedTableIterator : public InternalIteratorBase { size_t next_scan_idx; size_t cur_data_block_idx; + // Store the rate limiter separately to avoid accessing potentially invalid + // scan_opts + std::shared_ptr prefetch_rate_limiter; + MultiScanState( const MultiScanArgs* _scan_opts, std::vector>&& _pinned_data_blocks, @@ -393,7 +410,9 @@ class BlockBasedTableIterator : public InternalIteratorBase { pinned_data_blocks(std::move(_pinned_data_blocks)), block_ranges_per_scan(std::move(_block_ranges_per_scan)), next_scan_idx(0), - cur_data_block_idx(0) {} + cur_data_block_idx(0), + prefetch_rate_limiter(_scan_opts ? _scan_opts->prefetch_rate_limiter + : nullptr) {} }; std::unique_ptr multi_scan_; diff --git a/table/block_based/block_based_table_reader_test.cc b/table/block_based/block_based_table_reader_test.cc index 6f22965eb7d..52dfb2bd88f 100644 --- a/table/block_based/block_based_table_reader_test.cc +++ b/table/block_based/block_based_table_reader_test.cc @@ -30,6 +30,34 @@ namespace ROCKSDB_NAMESPACE { +// Simple barrier implementation for thread synchronization in tests +class Barrier { + public: + explicit Barrier(int count) : count_(count), waiting_(0), generation_(0) {} + + void arrive_and_wait() { + std::unique_lock lock(mutex_); + int gen = generation_; + + if (++waiting_ == count_) { + // Last thread to arrive - wake everyone up + waiting_ = 0; + ++generation_; + cv_.notify_all(); + } else { + // Wait for all threads to arrive + cv_.wait(lock, [this, gen] { return gen != generation_; }); + } + } + + private: + const int count_; + int waiting_; + int generation_; + std::mutex mutex_; + std::condition_variable cv_; +}; + class BlockBasedTableReaderBaseTest : public testing::Test { public: static constexpr int kBytesPerEntry = 256; @@ -1231,6 +1259,709 @@ INSTANTIATE_TEST_CASE_P( ::testing::Values(1, 2), ::testing::Values(0), ::testing::Values(false))); +// Tests for DefaultPrefetchRateLimiter +class DefaultPrefetchRateLimiterTest : public testing::Test { + public: + DefaultPrefetchRateLimiterTest() {} + ~DefaultPrefetchRateLimiterTest() override {} + + // Explicitly delete copy and move operations to satisfy Rule of Five + DefaultPrefetchRateLimiterTest(const DefaultPrefetchRateLimiterTest&) = + delete; + DefaultPrefetchRateLimiterTest& operator=( + const DefaultPrefetchRateLimiterTest&) = delete; + DefaultPrefetchRateLimiterTest(DefaultPrefetchRateLimiterTest&&) = delete; + DefaultPrefetchRateLimiterTest& operator=(DefaultPrefetchRateLimiterTest&&) = + delete; +}; + +TEST_F(DefaultPrefetchRateLimiterTest, BasicAcquireRelease) { + DefaultPrefetchRateLimiter limiter(10); + + // Test basic acquire + size_t acquired = limiter.acquire(nullptr, 5, false); + ASSERT_EQ(acquired, 5); + + // Test release + bool released = limiter.release(3); + ASSERT_TRUE(released); + + // Test acquire after partial release + acquired = limiter.acquire(nullptr, 4, false); + + ASSERT_EQ(acquired, 4); +} + +TEST_F(DefaultPrefetchRateLimiterTest, AllOrNothingTrue) { + DefaultPrefetchRateLimiter limiter(10); + + // First acquire some blocks + size_t acquired = limiter.acquire(nullptr, 6, false); + ASSERT_EQ(acquired, 6); + + // Try to acquire more than available with all_or_nothing=true + acquired = limiter.acquire(nullptr, 5, true); + ASSERT_EQ(acquired, 0); // Should fail and return 0 + + // Try to acquire exactly what's available with all_or_nothing=true + acquired = limiter.acquire(nullptr, 4, true); + ASSERT_EQ(acquired, 4); // Should succeed +} + +TEST_F(DefaultPrefetchRateLimiterTest, AllOrNothingFalse) { + DefaultPrefetchRateLimiter limiter(10); + + // First acquire some blocks + size_t acquired = limiter.acquire(nullptr, 7, false); + ASSERT_EQ(acquired, 7); + + // Try to acquire more than available with all_or_nothing=false + acquired = limiter.acquire(nullptr, 5, false); + ASSERT_EQ(acquired, 3); // Should return what's available + + // Now no blocks should be available + acquired = limiter.acquire(nullptr, 1, false); + ASSERT_EQ(acquired, 0); +} + +TEST_F(DefaultPrefetchRateLimiterTest, ReleaseExceedsMax) { + DefaultPrefetchRateLimiter limiter(10); + + // Acquire all blocks + size_t acquired = limiter.acquire(nullptr, 10, false); + ASSERT_EQ(acquired, 10); + + // Release more than what was acquired + bool released = limiter.release(15); + ASSERT_TRUE(released); + + // Should be capped at max_blocks_ + acquired = limiter.acquire(nullptr, 12, false); + ASSERT_EQ(acquired, 10); // Should only get max_blocks_ +} + +TEST_F(DefaultPrefetchRateLimiterTest, ZeroBlocksAvailable) { + DefaultPrefetchRateLimiter limiter(5); + + // Acquire all blocks + size_t acquired = limiter.acquire(nullptr, 5, false); + ASSERT_EQ(acquired, 5); + + // Try to acquire when none available + acquired = limiter.acquire(nullptr, 1, false); + ASSERT_EQ(acquired, 0); + + acquired = limiter.acquire(nullptr, 1, true); + ASSERT_EQ(acquired, 0); +} + +TEST_F(DefaultPrefetchRateLimiterTest, AcquireZeroBlocks) { + DefaultPrefetchRateLimiter limiter(10); + + // Acquire zero blocks + size_t acquired = limiter.acquire(nullptr, 0, false); + ASSERT_EQ(acquired, 0); + + acquired = limiter.acquire(nullptr, 0, true); + ASSERT_EQ(acquired, 0); + + // All blocks should still be available + acquired = limiter.acquire(nullptr, 10, false); + ASSERT_EQ(acquired, 10); +} + +TEST_F(DefaultPrefetchRateLimiterTest, ReleaseZeroBlocks) { + DefaultPrefetchRateLimiter limiter(10); + + // Acquire some blocks + size_t acquired = limiter.acquire(nullptr, 5, false); + ASSERT_EQ(acquired, 5); + + // Release zero blocks + bool released = limiter.release(0); + ASSERT_TRUE(released); + + // Should still have 5 blocks available + acquired = limiter.acquire(nullptr, 6, false); + ASSERT_EQ(acquired, 5); +} + +TEST_F(DefaultPrefetchRateLimiterTest, LargeNumbers) { + DefaultPrefetchRateLimiter limiter(1000000); + + // Acquire large number of blocks + size_t acquired = limiter.acquire(nullptr, 500000, false); + ASSERT_EQ(acquired, 500000); + + // Acquire more + acquired = limiter.acquire(nullptr, 600000, false); + ASSERT_EQ(acquired, 500000); // Should get remaining blocks + + // Release large number + bool released = limiter.release(800000); + ASSERT_TRUE(released); + + // Should be capped at max + acquired = limiter.acquire(nullptr, 1200000, false); + ASSERT_EQ(acquired, 800000); +} + +// Thread safety test +TEST_F(DefaultPrefetchRateLimiterTest, ThreadSafety) { + DefaultPrefetchRateLimiter limiter(1000); + std::atomic total_acquired{0}; + std::atomic total_released{0}; + + const int num_threads = 10; + const int operations_per_thread = 10000; + Barrier barrier(num_threads); + + std::vector threads; + + // Create threads that acquire and release blocks + threads.reserve(num_threads); + for (int i = 0; i < num_threads; ++i) { + threads.emplace_back( + [&barrier, &limiter, &total_acquired, &total_released]() { + barrier.arrive_and_wait(); + + for (int j = 0; j < operations_per_thread; ++j) { + // Acquire some blocks + size_t acquired = limiter.acquire(nullptr, 5, false); + total_acquired.fetch_add(acquired); + + // Release some blocks + if (acquired > 0) { + bool released = limiter.release(acquired); + if (released) { + total_released.fetch_add(acquired); + } + } + } + }); + } + + // Wait for all threads to complete + for (auto& thread : threads) { + thread.join(); + } + + // Verify that total released equals total acquired + ASSERT_EQ(total_acquired.load(), total_released.load()); + + // Verify that we can still acquire the maximum number of blocks + size_t final_acquired = limiter.acquire(nullptr, 1000, true); + ASSERT_EQ(final_acquired, 1000); +} + +// Test concurrent acquire operations +TEST_F(DefaultPrefetchRateLimiterTest, ConcurrentAcquire) { + DefaultPrefetchRateLimiter limiter(100); + std::atomic total_acquired{0}; + + const int num_threads = 20; + std::vector threads; + + // Create threads that try to acquire blocks concurrently + threads.reserve(num_threads); + for (int i = 0; i < num_threads; ++i) { + threads.emplace_back([&limiter, &total_acquired]() { + size_t acquired = limiter.acquire(nullptr, 10, false); + total_acquired.fetch_add(acquired); + }); + } + + // Wait for all threads to complete + for (auto& thread : threads) { + thread.join(); + } + + // Total acquired should not exceed the limit + ASSERT_LE(total_acquired.load(), 100); + + // The sum should be exactly 100 since we have enough demand + ASSERT_EQ(total_acquired.load(), 100); +} + +// Test concurrent release operations +TEST_F(DefaultPrefetchRateLimiterTest, ConcurrentRelease) { + DefaultPrefetchRateLimiter limiter(100000); + + // First acquire all blocks + size_t acquired = limiter.acquire(nullptr, 100000, false); + ASSERT_EQ(acquired, 100000); + + const int num_threads = 10; + std::vector threads; + Barrier barrier(num_threads); + + // Create threads that try to release blocks concurrently + threads.reserve(num_threads); + for (int i = 0; i < num_threads; ++i) { + threads.emplace_back([&barrier, &limiter]() { + barrier.arrive_and_wait(); + + for (int t = 0; t < 10000; ++t) { + limiter.release(1); + } + }); + } + + // Wait for all threads to complete + for (auto& thread : threads) { + thread.join(); + } + + // Should be able to acquire the maximum again (releases should be capped) + acquired = limiter.acquire(nullptr, 100000, false); + ASSERT_EQ(acquired, 100000); // Should be capped at max_blocks_ +} + +// Integration tests for PrefetchRateLimiter with MultiScan and Prepare +class PrefetchRateLimiterIntegrationTest + : public BlockBasedTableReaderBaseTest { + protected: + void SetUp() override { BlockBasedTableReaderBaseTest::SetUp(); } + + void ConfigureTableFactory() override { + BlockBasedTableOptions opts; + opts.index_type = BlockBasedTableOptions::IndexType::kBinarySearch; + opts.no_block_cache = false; + opts.filter_policy.reset(NewBloomFilterPolicy(10, false)); + options_.table_factory.reset( + static_cast(NewBlockBasedTableFactory(opts))); + options_.prefix_extractor = + std::shared_ptr(NewFixedPrefixTransform(3)); + } +}; + +// Mock PrefetchRateLimiter to track acquire/release calls +class MockPrefetchRateLimiter : public PrefetchRateLimiter { + public: + explicit MockPrefetchRateLimiter(size_t max_bytes) + : max_bytes_(max_bytes), cur_bytes_(max_bytes) {} + + size_t acquire(const BlockBasedTable* table, size_t bytes, + bool all_or_nothing) override { + (void)table; // Suppress unused parameter warning + acquire_calls_++; + total_bytes_requested_ += bytes; + + if (deny_all_requests_) { + return 0; + } + + size_t current = cur_bytes_.load(); + if (current == 0) { + return 0; + } + + size_t to_acquire; + if (all_or_nothing) { + if (current >= bytes) { + to_acquire = bytes; + } else { + return 0; + } + } else { + to_acquire = std::min(current, bytes); + } + + cur_bytes_.fetch_sub(to_acquire); + total_bytes_acquired_ += to_acquire; + return to_acquire; + } + + bool release(size_t bytes) override { + release_calls_++; + total_bytes_released_ += bytes; + + size_t current = cur_bytes_.load(); + size_t new_value = std::min(max_bytes_, current + bytes); + cur_bytes_.store(new_value); + return true; + } + + // Test accessors + size_t GetAcquireCalls() const { return acquire_calls_; } + size_t GetReleaseCalls() const { return release_calls_; } + size_t GetTotalBytesRequested() const { return total_bytes_requested_; } + size_t GetTotalBytesAcquired() const { return total_bytes_acquired_; } + size_t GetTotalBytesReleased() const { return total_bytes_released_; } + size_t GetCurrentBytes() const { return cur_bytes_.load(); } + + void SetDenyAllRequests(bool deny) { deny_all_requests_ = deny; } + void Reset() { + acquire_calls_ = 0; + release_calls_ = 0; + total_bytes_requested_ = 0; + total_bytes_acquired_ = 0; + total_bytes_released_ = 0; + deny_all_requests_ = false; + cur_bytes_ = max_bytes_; + } + + private: + const size_t max_bytes_; + std::atomic cur_bytes_; + std::atomic acquire_calls_{0}; + std::atomic release_calls_{0}; + std::atomic total_bytes_requested_{0}; + std::atomic total_bytes_acquired_{0}; + std::atomic total_bytes_released_{0}; + std::atomic deny_all_requests_{false}; +}; + +TEST_F(PrefetchRateLimiterIntegrationTest, BasicRateLimitingInPrepare) { + Options options; + options.statistics = CreateDBStatistics(); + size_t ts_sz = options.comparator->timestamp_size(); + std::vector> kv = + BlockBasedTableReaderBaseTest::GenerateKVMap( + 20 /* num_block */, true /* mixed_with_human_readable_string_value */, + ts_sz); + + std::string table_name = + "PrefetchRateLimiterIntegrationTest_BasicRateLimiting"; + ImmutableOptions ioptions(options); + CreateTable(table_name, ioptions, CompressionType::kNoCompression, kv); + + std::unique_ptr table; + FileOptions foptions; + foptions.use_direct_reads = true; + InternalKeyComparator comparator(options.comparator); + NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table, + true /* prefetch_index_and_filter_in_cache */); + + // Create a rate limiter with limited capacity + auto mock_limiter = + std::make_shared(50000); // 50KB limit + + // Create MultiScanArgs with rate limiter + MultiScanArgs scan_options(BytewiseComparator()); + scan_options.prefetch_rate_limiter = mock_limiter; + scan_options.insert(ExtractUserKey(kv[0].first), + ExtractUserKey(kv[5 * kEntriesPerBlock].first)); + scan_options.insert(ExtractUserKey(kv[10 * kEntriesPerBlock].first), + ExtractUserKey(kv[15 * kEntriesPerBlock].first)); + + ReadOptions read_opts; + std::unique_ptr iter; + iter.reset(table->NewIterator( + read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr, + /*skip_filters=*/false, TableReaderCaller::kUncategorized)); + + // Call Prepare - this should trigger rate limiter calls + iter->Prepare(&scan_options); + + // Verify that the rate limiter was called + ASSERT_GT(mock_limiter->GetAcquireCalls(), 0); + ASSERT_GT(mock_limiter->GetTotalBytesRequested(), 0); + ASSERT_GT(mock_limiter->GetTotalBytesAcquired(), 0); + + // Verify that we didn't exceed the rate limit + ASSERT_LE(mock_limiter->GetTotalBytesAcquired(), 50000); + + // Test that iteration works + iter->Seek(kv[0].first); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), kv[0].first); +} + +TEST_F(PrefetchRateLimiterIntegrationTest, RateLimiterDeniesAllRequests) { + Options options; + options.statistics = CreateDBStatistics(); + size_t ts_sz = options.comparator->timestamp_size(); + std::vector> kv = + BlockBasedTableReaderBaseTest::GenerateKVMap( + 10 /* num_block */, true /* mixed_with_human_readable_string_value */, + ts_sz); + + std::string table_name = + "PrefetchRateLimiterIntegrationTest_DeniesAllRequests"; + ImmutableOptions ioptions(options); + CreateTable(table_name, ioptions, CompressionType::kNoCompression, kv); + + std::unique_ptr table; + FileOptions foptions; + foptions.use_direct_reads = true; + InternalKeyComparator comparator(options.comparator); + NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table, + true /* prefetch_index_and_filter_in_cache */); + + // Create a rate limiter that denies all requests + auto mock_limiter = std::make_shared(50000); + mock_limiter->SetDenyAllRequests(true); + + // Create MultiScanArgs with rate limiter + MultiScanArgs scan_options(BytewiseComparator()); + scan_options.prefetch_rate_limiter = mock_limiter; + scan_options.insert(ExtractUserKey(kv[0].first), + ExtractUserKey(kv[5 * kEntriesPerBlock].first)); + + ReadOptions read_opts; + std::unique_ptr iter; + iter.reset(table->NewIterator( + read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr, + /*skip_filters=*/false, TableReaderCaller::kUncategorized)); + + // Call Prepare - this should trigger rate limiter calls but get denied + iter->Prepare(&scan_options); + + // Verify that the rate limiter was called but no bytes were acquired + ASSERT_GT(mock_limiter->GetAcquireCalls(), 0); + ASSERT_GT(mock_limiter->GetTotalBytesRequested(), 0); + ASSERT_EQ(mock_limiter->GetTotalBytesAcquired(), 0); + + // Iterator should still work (will read blocks on demand) + iter->Seek(kv[0].first); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), kv[0].first); +} + +TEST_F(PrefetchRateLimiterIntegrationTest, RateLimiterPartialAllocation) { + Options options; + options.statistics = CreateDBStatistics(); + size_t ts_sz = options.comparator->timestamp_size(); + std::vector> kv = + BlockBasedTableReaderBaseTest::GenerateKVMap( + 20 /* num_block */, true /* mixed_with_human_readable_string_value */, + ts_sz); + + std::string table_name = + "PrefetchRateLimiterIntegrationTest_PartialAllocation"; + ImmutableOptions ioptions(options); + CreateTable(table_name, ioptions, CompressionType::kNoCompression, kv); + + std::unique_ptr table; + FileOptions foptions; + foptions.use_direct_reads = true; + InternalKeyComparator comparator(options.comparator); + NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table, + true /* prefetch_index_and_filter_in_cache */); + + // Create a rate limiter with very limited capacity (only enough for a few + // blocks) + auto mock_limiter = + std::make_shared(8192); // 8KB limit + + // Create MultiScanArgs with rate limiter requesting many blocks + MultiScanArgs scan_options(BytewiseComparator()); + scan_options.prefetch_rate_limiter = mock_limiter; + scan_options.insert(ExtractUserKey(kv[0].first), + ExtractUserKey(kv[10 * kEntriesPerBlock].first)); + scan_options.insert(ExtractUserKey(kv[12 * kEntriesPerBlock].first), + ExtractUserKey(kv[19 * kEntriesPerBlock].first)); + + ReadOptions read_opts; + std::unique_ptr iter; + iter.reset(table->NewIterator( + read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr, + /*skip_filters=*/false, TableReaderCaller::kUncategorized)); + + // Call Prepare - this should trigger rate limiter calls + iter->Prepare(&scan_options); + + // Verify that the rate limiter was called + ASSERT_GT(mock_limiter->GetAcquireCalls(), 0); + ASSERT_GT(mock_limiter->GetTotalBytesRequested(), 0); + + // Should have acquired some bytes but not all requested + ASSERT_GT(mock_limiter->GetTotalBytesAcquired(), 0); + ASSERT_LT(mock_limiter->GetTotalBytesAcquired(), + mock_limiter->GetTotalBytesRequested()); + + // Should not exceed the rate limit + ASSERT_LE(mock_limiter->GetTotalBytesAcquired(), 8192); + + // Iterator should still work + iter->Seek(kv[0].first); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), kv[0].first); +} + +TEST_F(PrefetchRateLimiterIntegrationTest, RateLimiterReleaseOnDestroy) { + Options options; + options.statistics = CreateDBStatistics(); + size_t ts_sz = options.comparator->timestamp_size(); + std::vector> kv = + BlockBasedTableReaderBaseTest::GenerateKVMap( + 10 /* num_block */, true /* mixed_with_human_readable_string_value */, + ts_sz); + + std::string table_name = + "PrefetchRateLimiterIntegrationTest_ReleaseOnDestroy"; + ImmutableOptions ioptions(options); + CreateTable(table_name, ioptions, CompressionType::kNoCompression, kv); + + std::unique_ptr table; + FileOptions foptions; + foptions.use_direct_reads = true; + InternalKeyComparator comparator(options.comparator); + NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table, + true /* prefetch_index_and_filter_in_cache */); + + // Create a rate limiter + auto mock_limiter = std::make_shared(50000); + size_t initial_bytes = mock_limiter->GetCurrentBytes(); + + { + // Create MultiScanArgs with rate limiter in a scope + MultiScanArgs scan_options(BytewiseComparator()); + scan_options.prefetch_rate_limiter = mock_limiter; + scan_options.insert(ExtractUserKey(kv[0].first), + ExtractUserKey(kv[5 * kEntriesPerBlock].first)); + + ReadOptions read_opts; + std::unique_ptr iter; + iter.reset(table->NewIterator( + read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr, + /*skip_filters=*/false, TableReaderCaller::kUncategorized)); + + // Call Prepare + iter->Prepare(&scan_options); + + // Verify some bytes were acquired + ASSERT_GT(mock_limiter->GetTotalBytesAcquired(), 0); + ASSERT_LT(mock_limiter->GetCurrentBytes(), initial_bytes); + + // Use the iterator + iter->Seek(kv[0].first); + ASSERT_TRUE(iter->Valid()); + + // Iterator goes out of scope here, should trigger releases + } + + // After iterator destruction, verify that release was called + ASSERT_GT(mock_limiter->GetReleaseCalls(), 0); + ASSERT_GT(mock_limiter->GetTotalBytesReleased(), 0); +} + +TEST_F(PrefetchRateLimiterIntegrationTest, MultipleIteratorsShareRateLimiter) { + Options options; + options.statistics = CreateDBStatistics(); + size_t ts_sz = options.comparator->timestamp_size(); + std::vector> kv = + BlockBasedTableReaderBaseTest::GenerateKVMap( + 20 /* num_block */, true /* mixed_with_human_readable_string_value */, + ts_sz); + + std::string table_name = + "PrefetchRateLimiterIntegrationTest_MultipleIterators"; + ImmutableOptions ioptions(options); + CreateTable(table_name, ioptions, CompressionType::kNoCompression, kv); + + std::unique_ptr table; + FileOptions foptions; + foptions.use_direct_reads = true; + InternalKeyComparator comparator(options.comparator); + NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table, + true /* prefetch_index_and_filter_in_cache */); + + // Create a shared rate limiter with limited capacity + auto mock_limiter = std::make_shared(100000); + + ReadOptions read_opts; + + // Create first iterator + std::unique_ptr iter1; + iter1.reset(table->NewIterator( + read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr, + /*skip_filters=*/false, TableReaderCaller::kUncategorized)); + + MultiScanArgs scan_options1(BytewiseComparator()); + scan_options1.prefetch_rate_limiter = mock_limiter; + scan_options1.insert(ExtractUserKey(kv[0].first), + ExtractUserKey(kv[5 * kEntriesPerBlock].first)); + + // Create second iterator + std::unique_ptr iter2; + iter2.reset(table->NewIterator( + read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr, + /*skip_filters=*/false, TableReaderCaller::kUncategorized)); + + MultiScanArgs scan_options2(BytewiseComparator()); + scan_options2.prefetch_rate_limiter = mock_limiter; + scan_options2.insert(ExtractUserKey(kv[10 * kEntriesPerBlock].first), + ExtractUserKey(kv[15 * kEntriesPerBlock].first)); + + // Prepare both iterators - they should compete for the same rate limiter + iter1->Prepare(&scan_options1); + size_t bytes_after_first = mock_limiter->GetTotalBytesAcquired(); + + iter2->Prepare(&scan_options2); + size_t bytes_after_second = mock_limiter->GetTotalBytesAcquired(); + + // Verify both iterators used the rate limiter + ASSERT_GT(bytes_after_first, 0); + ASSERT_GT(bytes_after_second, bytes_after_first); + + // Total should not exceed the limit + ASSERT_LE(bytes_after_second, 100000); + + // Both iterators should work + iter1->Seek(kv[0].first); + ASSERT_TRUE(iter1->Valid()); + ASSERT_EQ(iter1->key().ToString(), kv[0].first); + + iter2->Seek(kv[10 * kEntriesPerBlock].first); + ASSERT_TRUE(iter2->Valid()); + ASSERT_EQ(iter2->key().ToString(), kv[10 * kEntriesPerBlock].first); +} + +TEST_F(PrefetchRateLimiterIntegrationTest, RateLimiterWithAllOrNothingMode) { + Options options; + options.statistics = CreateDBStatistics(); + size_t ts_sz = options.comparator->timestamp_size(); + std::vector> kv = + BlockBasedTableReaderBaseTest::GenerateKVMap( + 10 /* num_block */, true /* mixed_with_human_readable_string_value */, + ts_sz); + + std::string table_name = "PrefetchRateLimiterIntegrationTest_AllOrNothing"; + ImmutableOptions ioptions(options); + CreateTable(table_name, ioptions, CompressionType::kNoCompression, kv); + + std::unique_ptr table; + FileOptions foptions; + foptions.use_direct_reads = true; + InternalKeyComparator comparator(options.comparator); + NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table, + true /* prefetch_index_and_filter_in_cache */); + + // Create a rate limiter with capacity for exactly one block + auto mock_limiter = + std::make_shared(4096); // 4KB limit + + // Create MultiScanArgs requesting multiple blocks + MultiScanArgs scan_options(BytewiseComparator()); + scan_options.prefetch_rate_limiter = mock_limiter; + scan_options.insert(ExtractUserKey(kv[0].first), + ExtractUserKey(kv[5 * kEntriesPerBlock].first)); + + ReadOptions read_opts; + std::unique_ptr iter; + iter.reset(table->NewIterator( + read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr, + /*skip_filters=*/false, TableReaderCaller::kUncategorized)); + + // Call Prepare - should use all-or-nothing mode for some blocks + iter->Prepare(&scan_options); + + // Verify that the rate limiter was called + ASSERT_GT(mock_limiter->GetAcquireCalls(), 0); + ASSERT_GT(mock_limiter->GetTotalBytesRequested(), 0); + + // Should have acquired some bytes but likely not all due to all-or-nothing + // failures + ASSERT_LE(mock_limiter->GetTotalBytesAcquired(), 4096); + + // Iterator should still work + iter->Seek(kv[0].first); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), kv[0].first); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {