From b27217e01f85d6fe5fb4f483cbc42cfe55e93d79 Mon Sep 17 00:00:00 2001 From: Ryan Hancock Date: Thu, 28 Aug 2025 10:19:22 -0700 Subject: [PATCH 01/14] Initial Design of PrefetchRateLimiter --- include/rocksdb/options.h | 33 + .../block_based/block_based_table_iterator.cc | 74 ++ .../block_based/block_based_table_iterator.h | 12 +- .../block_based_table_reader_test.cc | 680 ++++++++++++++++++ 4 files changed, 798 insertions(+), 1 deletion(-) diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 206085b208a..a0670ca3dec 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1776,6 +1776,33 @@ struct ScanOptions { : range(_start, _upper_bound) {} }; +class BlockBasedTable; +class PrefetchRateLimiter { + public: + PrefetchRateLimiter() = default; + virtual ~PrefetchRateLimiter() = default; + + virtual size_t acquire(const BlockBasedTable* table, size_t bytes, + bool all_or_nothing) = 0; + virtual bool release(size_t bytes) = 0; +}; + +class DefaultPrefetchRateLimiter : public PrefetchRateLimiter { + public: + DefaultPrefetchRateLimiter() = default; + DefaultPrefetchRateLimiter(size_t max_bytes) + : max_bytes_(max_bytes), cur_bytes_(max_bytes) {} + virtual ~DefaultPrefetchRateLimiter() = default; + + virtual size_t acquire(const BlockBasedTable* table, size_t bytes, + bool all_or_nothing) override; + virtual bool release(size_t bytes) override; + + private: + const size_t max_bytes_; + std::atomic cur_bytes_; +}; + // Container for multiple scan ranges that can be used with MultiScan. // This replaces std::vector with a more efficient implementation // that can merge overlapping ranges. @@ -1849,6 +1876,12 @@ class MultiScanArgs { uint64_t io_coalesce_threshold = 16 << 10; // 16KB by default + using RateLimiter = std::optional>; + RateLimiter prefetch_rate_limiter; + PrefetchRateLimiter& GetMutablePrefetchRateLimiter() const { + return *prefetch_rate_limiter.value().get(); + } + private: // The comparator used for ordering ranges const Comparator* comp_; diff --git a/table/block_based/block_based_table_iterator.cc b/table/block_based/block_based_table_iterator.cc index a8d821e2c32..5003fe23f33 100644 --- a/table/block_based/block_based_table_iterator.cc +++ b/table/block_based/block_based_table_iterator.cc @@ -10,6 +10,56 @@ namespace ROCKSDB_NAMESPACE { +size_t DefaultPrefetchRateLimiter::acquire(const BlockBasedTable* /*unused*/, + size_t bytes, bool all_or_nothing) { + bool done = false; + size_t amount = 0; + // Quick check if we have nothing. + if (cur_bytes_ == 0) { + return amount; + } + while (!done) { + // Check again here. + size_t current = cur_bytes_.load(std::memory_order_acquire); + if (current == 0) { + amount = 0; + return amount; + } + if (all_or_nothing) { + if (current >= bytes) { + done = cur_bytes_.compare_exchange_weak(current, current - bytes); + amount = bytes; + } else { + amount = 0; + return amount; + } + } else { + if (current > bytes) { + done = cur_bytes_.compare_exchange_weak(current, current - bytes); + amount = bytes; + } else { + done = cur_bytes_.compare_exchange_weak(current, 0); + amount = current; + } + } + } + return amount; +} + +bool DefaultPrefetchRateLimiter::release(size_t bytes) { + bool done = false; + while (!done) { + // Check again here. + size_t current = cur_bytes_.load(std::memory_order_acq_rel); + if (current + bytes >= max_bytes_) { + done = cur_bytes_.compare_exchange_weak(current, max_bytes_); + } else { + done = cur_bytes_.compare_exchange_weak(current, current + bytes); + } + } + return true; +} + void BlockBasedTableIterator::SeekToFirst() { SeekImpl(nullptr, false); } void BlockBasedTableIterator::Seek(const Slice& target) { @@ -984,6 +1034,7 @@ void BlockBasedTableIterator::Prepare(const MultiScanArgs* multiscan_opts) { std::vector blocks_to_prepare; Status s; std::vector> block_ranges_per_scan; + total_acquired_ = 0; for (const auto& scan_opt : *scan_opts) { size_t num_blocks = 0; // Current scan overlap the last block of the previous scan. @@ -1000,6 +1051,16 @@ void BlockBasedTableIterator::Prepare(const MultiScanArgs* multiscan_opts) { index_iter_->user_key(), /*a_has_ts*/ true, *scan_opt.range.limit, /*b_has_ts=*/false) <= 0)) { + // Lets make sure we are rate limited on how many blocks to prepare + if (multiscan_opts->prefetch_rate_limiter) { + auto blocks = multiscan_opts->GetMutablePrefetchRateLimiter().acquire( + table_, index_iter_->value().handle.size(), true); + total_acquired_ += blocks; + if (blocks == 0) { + break; + } + } + if (check_overlap && blocks_to_prepare.back() == index_iter_->value().handle) { // Skip the current block since it's already in the list @@ -1162,6 +1223,10 @@ void BlockBasedTableIterator::Prepare(const MultiScanArgs* multiscan_opts) { } } + if (read_reqs.size() == 0) { + return; + } + AlignedBuf aligned_buf; s = table_->get_rep()->file.get()->MultiRead( io_opts, read_reqs.data(), read_reqs.size(), @@ -1345,6 +1410,15 @@ void BlockBasedTableIterator::FindBlockForwardInMultiScan() { } // Move to the next pinned data block ResetDataIter(); + if (multi_scan_->scan_opts->prefetch_rate_limiter) { + size_t releasing = + multi_scan_->pinned_data_blocks[multi_scan_->cur_data_block_idx] + .GetValue() + ->size(); + multi_scan_->scan_opts->GetMutablePrefetchRateLimiter().release( + releasing); + total_acquired_ -= releasing; + } ++multi_scan_->cur_data_block_idx; table_->NewDataBlockIterator( read_options_, diff --git a/table/block_based/block_based_table_iterator.h b/table/block_based/block_based_table_iterator.h index d31296fcf84..1d68ad9e808 100644 --- a/table/block_based/block_based_table_iterator.h +++ b/table/block_based/block_based_table_iterator.h @@ -16,6 +16,7 @@ #include "table/block_based/reader_common.h" namespace ROCKSDB_NAMESPACE { + // Iterates over the contents of BlockBasedTable. class BlockBasedTableIterator : public InternalIteratorBase { // compaction_readahead_size: its value will only be used if for_compaction = @@ -47,7 +48,14 @@ class BlockBasedTableIterator : public InternalIteratorBase { is_last_level_(table->IsLastLevel()), block_iter_points_to_real_block_(false) {} - ~BlockBasedTableIterator() override { ClearBlockHandles(); } + ~BlockBasedTableIterator() override { + if (multi_scan_ && multi_scan_->scan_opts->prefetch_rate_limiter) { + multi_scan_->scan_opts->GetMutablePrefetchRateLimiter().release( + total_acquired_); + } + + ClearBlockHandles(); + } void Seek(const Slice& target) override; void SeekForPrev(const Slice& target) override; @@ -373,6 +381,8 @@ class BlockBasedTableIterator : public InternalIteratorBase { // *** END States used by both regular scan and multiscan // *** BEGIN MultiScan related states *** + size_t total_acquired_ = 0; + struct MultiScanState { // bool prepared_ = false; const MultiScanArgs* scan_opts; diff --git a/table/block_based/block_based_table_reader_test.cc b/table/block_based/block_based_table_reader_test.cc index 6f22965eb7d..a046bf9e52e 100644 --- a/table/block_based/block_based_table_reader_test.cc +++ b/table/block_based/block_based_table_reader_test.cc @@ -1231,6 +1231,686 @@ INSTANTIATE_TEST_CASE_P( ::testing::Values(1, 2), ::testing::Values(0), ::testing::Values(false))); +// Tests for DefaultPrefetchRateLimiter +class DefaultPrefetchRateLimiterTest : public testing::Test { + public: + DefaultPrefetchRateLimiterTest() {} + ~DefaultPrefetchRateLimiterTest() override {} +}; + +TEST_F(DefaultPrefetchRateLimiterTest, BasicAcquireRelease) { + DefaultPrefetchRateLimiter limiter(10); + + // Test basic acquire + size_t acquired = limiter.acquire(nullptr, 5, false); + ASSERT_EQ(acquired, 5); + + // Test release + bool released = limiter.release(3); + ASSERT_TRUE(released); + + // Test acquire after partial release + acquired = limiter.acquire(nullptr, 4, false); + ASSERT_EQ(acquired, 4); +} + +TEST_F(DefaultPrefetchRateLimiterTest, AllOrNothingTrue) { + DefaultPrefetchRateLimiter limiter(10); + + // First acquire some blocks + size_t acquired = limiter.acquire(nullptr, 6, false); + ASSERT_EQ(acquired, 6); + + // Try to acquire more than available with all_or_nothing=true + acquired = limiter.acquire(nullptr, 5, true); + ASSERT_EQ(acquired, 0); // Should fail and return 0 + + // Try to acquire exactly what's available with all_or_nothing=true + acquired = limiter.acquire(nullptr, 4, true); + ASSERT_EQ(acquired, 4); // Should succeed +} + +TEST_F(DefaultPrefetchRateLimiterTest, AllOrNothingFalse) { + DefaultPrefetchRateLimiter limiter(10); + + // First acquire some blocks + size_t acquired = limiter.acquire(nullptr, 7, false); + ASSERT_EQ(acquired, 7); + + // Try to acquire more than available with all_or_nothing=false + acquired = limiter.acquire(nullptr, 5, false); + ASSERT_EQ(acquired, 3); // Should return what's available + + // Now no blocks should be available + acquired = limiter.acquire(nullptr, 1, false); + ASSERT_EQ(acquired, 0); +} + +TEST_F(DefaultPrefetchRateLimiterTest, ReleaseExceedsMax) { + DefaultPrefetchRateLimiter limiter(10); + + // Acquire all blocks + size_t acquired = limiter.acquire(nullptr, 10, false); + ASSERT_EQ(acquired, 10); + + // Release more than what was acquired + bool released = limiter.release(15); + ASSERT_TRUE(released); + + // Should be capped at max_blocks_ + acquired = limiter.acquire(nullptr, 12, false); + ASSERT_EQ(acquired, 10); // Should only get max_blocks_ +} + +TEST_F(DefaultPrefetchRateLimiterTest, ZeroBlocksAvailable) { + DefaultPrefetchRateLimiter limiter(5); + + // Acquire all blocks + size_t acquired = limiter.acquire(nullptr, 5, false); + ASSERT_EQ(acquired, 5); + + // Try to acquire when none available + acquired = limiter.acquire(nullptr, 1, false); + ASSERT_EQ(acquired, 0); + + acquired = limiter.acquire(nullptr, 1, true); + ASSERT_EQ(acquired, 0); +} + +TEST_F(DefaultPrefetchRateLimiterTest, AcquireZeroBlocks) { + DefaultPrefetchRateLimiter limiter(10); + + // Acquire zero blocks + size_t acquired = limiter.acquire(nullptr, 0, false); + ASSERT_EQ(acquired, 0); + + acquired = limiter.acquire(nullptr, 0, true); + ASSERT_EQ(acquired, 0); + + // All blocks should still be available + acquired = limiter.acquire(nullptr, 10, false); + ASSERT_EQ(acquired, 10); +} + +TEST_F(DefaultPrefetchRateLimiterTest, ReleaseZeroBlocks) { + DefaultPrefetchRateLimiter limiter(10); + + // Acquire some blocks + size_t acquired = limiter.acquire(nullptr, 5, false); + ASSERT_EQ(acquired, 5); + + // Release zero blocks + bool released = limiter.release(0); + ASSERT_TRUE(released); + + // Should still have 5 blocks available + acquired = limiter.acquire(nullptr, 6, false); + ASSERT_EQ(acquired, 5); +} + +TEST_F(DefaultPrefetchRateLimiterTest, LargeNumbers) { + DefaultPrefetchRateLimiter limiter(1000000); + + // Acquire large number of blocks + size_t acquired = limiter.acquire(nullptr, 500000, false); + ASSERT_EQ(acquired, 500000); + + // Acquire more + acquired = limiter.acquire(nullptr, 600000, false); + ASSERT_EQ(acquired, 500000); // Should get remaining blocks + + // Release large number + bool released = limiter.release(800000); + ASSERT_TRUE(released); + + // Should be capped at max + acquired = limiter.acquire(nullptr, 1200000, false); + ASSERT_EQ(acquired, 800000); +} + +// Thread safety test +TEST_F(DefaultPrefetchRateLimiterTest, ThreadSafety) { + DefaultPrefetchRateLimiter limiter(1000); + std::atomic total_acquired{0}; + std::atomic total_released{0}; + + const int num_threads = 10; + const int operations_per_thread = 10000; + + std::vector threads; + + // Create threads that acquire and release blocks + for (int i = 0; i < num_threads; ++i) { + threads.emplace_back( + [&limiter, &total_acquired, &total_released, operations_per_thread]() { + for (int j = 0; j < operations_per_thread; ++j) { + // Acquire some blocks + size_t acquired = limiter.acquire(nullptr, 5, false); + total_acquired.fetch_add(acquired); + + // Release some blocks + if (acquired > 0) { + bool released = limiter.release(acquired); + if (released) { + total_released.fetch_add(acquired); + } + } + } + }); + } + + // Wait for all threads to complete + for (auto& thread : threads) { + thread.join(); + } + + // Verify that total released equals total acquired + ASSERT_EQ(total_acquired.load(), total_released.load()); + + // Verify that we can still acquire the maximum number of blocks + size_t final_acquired = limiter.acquire(nullptr, 1000, true); + ASSERT_EQ(final_acquired, 1000); +} + +// Test concurrent acquire operations +TEST_F(DefaultPrefetchRateLimiterTest, ConcurrentAcquire) { + DefaultPrefetchRateLimiter limiter(100); + std::atomic total_acquired{0}; + + const int num_threads = 20; + std::vector threads; + + // Create threads that try to acquire blocks concurrently + for (int i = 0; i < num_threads; ++i) { + threads.emplace_back([&limiter, &total_acquired]() { + size_t acquired = limiter.acquire(nullptr, 10, false); + total_acquired.fetch_add(acquired); + }); + } + + // Wait for all threads to complete + for (auto& thread : threads) { + thread.join(); + } + + // Total acquired should not exceed the limit + ASSERT_LE(total_acquired.load(), 100); + + // The sum should be exactly 100 since we have enough demand + ASSERT_EQ(total_acquired.load(), 100); +} + +// Test concurrent release operations +TEST_F(DefaultPrefetchRateLimiterTest, ConcurrentRelease) { + DefaultPrefetchRateLimiter limiter(50); + + // First acquire all blocks + size_t acquired = limiter.acquire(nullptr, 50, false); + ASSERT_EQ(acquired, 50); + + const int num_threads = 10; + std::vector threads; + + // Create threads that try to release blocks concurrently + for (int i = 0; i < num_threads; ++i) { + threads.emplace_back([&limiter]() { limiter.release(10); }); + } + + // Wait for all threads to complete + for (auto& thread : threads) { + thread.join(); + } + + // Should be able to acquire the maximum again (releases should be capped) + acquired = limiter.acquire(nullptr, 60, false); + ASSERT_EQ(acquired, 50); // Should be capped at max_blocks_ +} + +// Integration tests for PrefetchRateLimiter with MultiScan and Prepare +class PrefetchRateLimiterIntegrationTest + : public BlockBasedTableReaderBaseTest { + protected: + void SetUp() override { BlockBasedTableReaderBaseTest::SetUp(); } + + void ConfigureTableFactory() override { + BlockBasedTableOptions opts; + opts.index_type = BlockBasedTableOptions::IndexType::kBinarySearch; + opts.no_block_cache = false; + opts.filter_policy.reset(NewBloomFilterPolicy(10, false)); + options_.table_factory.reset( + static_cast(NewBlockBasedTableFactory(opts))); + options_.prefix_extractor = + std::shared_ptr(NewFixedPrefixTransform(3)); + } +}; + +// Mock PrefetchRateLimiter to track acquire/release calls +class MockPrefetchRateLimiter : public PrefetchRateLimiter { + public: + MockPrefetchRateLimiter(size_t max_bytes) + : max_bytes_(max_bytes), cur_bytes_(max_bytes) {} + + size_t acquire(const BlockBasedTable* table, size_t bytes, + bool all_or_nothing) override { + (void)table; // Suppress unused parameter warning + acquire_calls_++; + total_bytes_requested_ += bytes; + + if (deny_all_requests_) { + return 0; + } + + size_t current = cur_bytes_.load(); + if (current == 0) { + return 0; + } + + size_t to_acquire; + if (all_or_nothing) { + if (current >= bytes) { + to_acquire = bytes; + } else { + return 0; + } + } else { + to_acquire = std::min(current, bytes); + } + + cur_bytes_.fetch_sub(to_acquire); + total_bytes_acquired_ += to_acquire; + return to_acquire; + } + + bool release(size_t bytes) override { + release_calls_++; + total_bytes_released_ += bytes; + + size_t current = cur_bytes_.load(); + size_t new_value = std::min(max_bytes_, current + bytes); + cur_bytes_.store(new_value); + return true; + } + + // Test accessors + size_t GetAcquireCalls() const { return acquire_calls_; } + size_t GetReleaseCalls() const { return release_calls_; } + size_t GetTotalBytesRequested() const { return total_bytes_requested_; } + size_t GetTotalBytesAcquired() const { return total_bytes_acquired_; } + size_t GetTotalBytesReleased() const { return total_bytes_released_; } + size_t GetCurrentBytes() const { return cur_bytes_.load(); } + + void SetDenyAllRequests(bool deny) { deny_all_requests_ = deny; } + void Reset() { + acquire_calls_ = 0; + release_calls_ = 0; + total_bytes_requested_ = 0; + total_bytes_acquired_ = 0; + total_bytes_released_ = 0; + deny_all_requests_ = false; + cur_bytes_ = max_bytes_; + } + + private: + const size_t max_bytes_; + std::atomic cur_bytes_; + std::atomic acquire_calls_{0}; + std::atomic release_calls_{0}; + std::atomic total_bytes_requested_{0}; + std::atomic total_bytes_acquired_{0}; + std::atomic total_bytes_released_{0}; + std::atomic deny_all_requests_{false}; +}; + +TEST_F(PrefetchRateLimiterIntegrationTest, BasicRateLimitingInPrepare) { + Options options; + options.statistics = CreateDBStatistics(); + size_t ts_sz = options.comparator->timestamp_size(); + std::vector> kv = + BlockBasedTableReaderBaseTest::GenerateKVMap( + 20 /* num_block */, true /* mixed_with_human_readable_string_value */, + ts_sz); + + std::string table_name = + "PrefetchRateLimiterIntegrationTest_BasicRateLimiting"; + ImmutableOptions ioptions(options); + CreateTable(table_name, ioptions, CompressionType::kNoCompression, kv); + + std::unique_ptr table; + FileOptions foptions; + foptions.use_direct_reads = true; + InternalKeyComparator comparator(options.comparator); + NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table, + true /* prefetch_index_and_filter_in_cache */); + + // Create a rate limiter with limited capacity + auto mock_limiter = + std::make_shared(50000); // 50KB limit + + // Create MultiScanArgs with rate limiter + MultiScanArgs scan_options(BytewiseComparator()); + scan_options.prefetch_rate_limiter = mock_limiter; + scan_options.insert(ExtractUserKey(kv[0].first), + ExtractUserKey(kv[5 * kEntriesPerBlock].first)); + scan_options.insert(ExtractUserKey(kv[10 * kEntriesPerBlock].first), + ExtractUserKey(kv[15 * kEntriesPerBlock].first)); + + ReadOptions read_opts; + std::unique_ptr iter; + iter.reset(table->NewIterator( + read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr, + /*skip_filters=*/false, TableReaderCaller::kUncategorized)); + + // Call Prepare - this should trigger rate limiter calls + iter->Prepare(&scan_options); + + // Verify that the rate limiter was called + ASSERT_GT(mock_limiter->GetAcquireCalls(), 0); + ASSERT_GT(mock_limiter->GetTotalBytesRequested(), 0); + ASSERT_GT(mock_limiter->GetTotalBytesAcquired(), 0); + + // Verify that we didn't exceed the rate limit + ASSERT_LE(mock_limiter->GetTotalBytesAcquired(), 50000); + + // Test that iteration works + iter->Seek(kv[0].first); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), kv[0].first); +} + +TEST_F(PrefetchRateLimiterIntegrationTest, RateLimiterDeniesAllRequests) { + Options options; + options.statistics = CreateDBStatistics(); + size_t ts_sz = options.comparator->timestamp_size(); + std::vector> kv = + BlockBasedTableReaderBaseTest::GenerateKVMap( + 10 /* num_block */, true /* mixed_with_human_readable_string_value */, + ts_sz); + + std::string table_name = + "PrefetchRateLimiterIntegrationTest_DeniesAllRequests"; + ImmutableOptions ioptions(options); + CreateTable(table_name, ioptions, CompressionType::kNoCompression, kv); + + std::unique_ptr table; + FileOptions foptions; + foptions.use_direct_reads = true; + InternalKeyComparator comparator(options.comparator); + NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table, + true /* prefetch_index_and_filter_in_cache */); + + // Create a rate limiter that denies all requests + auto mock_limiter = std::make_shared(50000); + mock_limiter->SetDenyAllRequests(true); + + // Create MultiScanArgs with rate limiter + MultiScanArgs scan_options(BytewiseComparator()); + scan_options.prefetch_rate_limiter = mock_limiter; + scan_options.insert(ExtractUserKey(kv[0].first), + ExtractUserKey(kv[5 * kEntriesPerBlock].first)); + + ReadOptions read_opts; + std::unique_ptr iter; + iter.reset(table->NewIterator( + read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr, + /*skip_filters=*/false, TableReaderCaller::kUncategorized)); + + // Call Prepare - this should trigger rate limiter calls but get denied + iter->Prepare(&scan_options); + + // Verify that the rate limiter was called but no bytes were acquired + ASSERT_GT(mock_limiter->GetAcquireCalls(), 0); + ASSERT_GT(mock_limiter->GetTotalBytesRequested(), 0); + ASSERT_EQ(mock_limiter->GetTotalBytesAcquired(), 0); + + // Iterator should still work (will read blocks on demand) + iter->Seek(kv[0].first); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), kv[0].first); +} + +TEST_F(PrefetchRateLimiterIntegrationTest, RateLimiterPartialAllocation) { + Options options; + options.statistics = CreateDBStatistics(); + size_t ts_sz = options.comparator->timestamp_size(); + std::vector> kv = + BlockBasedTableReaderBaseTest::GenerateKVMap( + 20 /* num_block */, true /* mixed_with_human_readable_string_value */, + ts_sz); + + std::string table_name = + "PrefetchRateLimiterIntegrationTest_PartialAllocation"; + ImmutableOptions ioptions(options); + CreateTable(table_name, ioptions, CompressionType::kNoCompression, kv); + + std::unique_ptr table; + FileOptions foptions; + foptions.use_direct_reads = true; + InternalKeyComparator comparator(options.comparator); + NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table, + true /* prefetch_index_and_filter_in_cache */); + + // Create a rate limiter with very limited capacity (only enough for a few + // blocks) + auto mock_limiter = + std::make_shared(8192); // 8KB limit + + // Create MultiScanArgs with rate limiter requesting many blocks + MultiScanArgs scan_options(BytewiseComparator()); + scan_options.prefetch_rate_limiter = mock_limiter; + scan_options.insert(ExtractUserKey(kv[0].first), + ExtractUserKey(kv[10 * kEntriesPerBlock].first)); + scan_options.insert(ExtractUserKey(kv[12 * kEntriesPerBlock].first), + ExtractUserKey(kv[19 * kEntriesPerBlock].first)); + + ReadOptions read_opts; + std::unique_ptr iter; + iter.reset(table->NewIterator( + read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr, + /*skip_filters=*/false, TableReaderCaller::kUncategorized)); + + // Call Prepare - this should trigger rate limiter calls + iter->Prepare(&scan_options); + + // Verify that the rate limiter was called + ASSERT_GT(mock_limiter->GetAcquireCalls(), 0); + ASSERT_GT(mock_limiter->GetTotalBytesRequested(), 0); + + // Should have acquired some bytes but not all requested + ASSERT_GT(mock_limiter->GetTotalBytesAcquired(), 0); + ASSERT_LT(mock_limiter->GetTotalBytesAcquired(), + mock_limiter->GetTotalBytesRequested()); + + // Should not exceed the rate limit + ASSERT_LE(mock_limiter->GetTotalBytesAcquired(), 8192); + + // Iterator should still work + iter->Seek(kv[0].first); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), kv[0].first); +} + +TEST_F(PrefetchRateLimiterIntegrationTest, RateLimiterReleaseOnDestroy) { + Options options; + options.statistics = CreateDBStatistics(); + size_t ts_sz = options.comparator->timestamp_size(); + std::vector> kv = + BlockBasedTableReaderBaseTest::GenerateKVMap( + 10 /* num_block */, true /* mixed_with_human_readable_string_value */, + ts_sz); + + std::string table_name = + "PrefetchRateLimiterIntegrationTest_ReleaseOnDestroy"; + ImmutableOptions ioptions(options); + CreateTable(table_name, ioptions, CompressionType::kNoCompression, kv); + + std::unique_ptr table; + FileOptions foptions; + foptions.use_direct_reads = true; + InternalKeyComparator comparator(options.comparator); + NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table, + true /* prefetch_index_and_filter_in_cache */); + + // Create a rate limiter + auto mock_limiter = std::make_shared(50000); + size_t initial_bytes = mock_limiter->GetCurrentBytes(); + + { + // Create MultiScanArgs with rate limiter in a scope + MultiScanArgs scan_options(BytewiseComparator()); + scan_options.prefetch_rate_limiter = mock_limiter; + scan_options.insert(ExtractUserKey(kv[0].first), + ExtractUserKey(kv[5 * kEntriesPerBlock].first)); + + ReadOptions read_opts; + std::unique_ptr iter; + iter.reset(table->NewIterator( + read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr, + /*skip_filters=*/false, TableReaderCaller::kUncategorized)); + + // Call Prepare + iter->Prepare(&scan_options); + + // Verify some bytes were acquired + ASSERT_GT(mock_limiter->GetTotalBytesAcquired(), 0); + ASSERT_LT(mock_limiter->GetCurrentBytes(), initial_bytes); + + // Use the iterator + iter->Seek(kv[0].first); + ASSERT_TRUE(iter->Valid()); + + // Iterator goes out of scope here, should trigger releases + } + + // After iterator destruction, verify that release was called + ASSERT_GT(mock_limiter->GetReleaseCalls(), 0); + ASSERT_GT(mock_limiter->GetTotalBytesReleased(), 0); +} + +TEST_F(PrefetchRateLimiterIntegrationTest, MultipleIteratorsShareRateLimiter) { + Options options; + options.statistics = CreateDBStatistics(); + size_t ts_sz = options.comparator->timestamp_size(); + std::vector> kv = + BlockBasedTableReaderBaseTest::GenerateKVMap( + 20 /* num_block */, true /* mixed_with_human_readable_string_value */, + ts_sz); + + std::string table_name = + "PrefetchRateLimiterIntegrationTest_MultipleIterators"; + ImmutableOptions ioptions(options); + CreateTable(table_name, ioptions, CompressionType::kNoCompression, kv); + + std::unique_ptr table; + FileOptions foptions; + foptions.use_direct_reads = true; + InternalKeyComparator comparator(options.comparator); + NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table, + true /* prefetch_index_and_filter_in_cache */); + + // Create a shared rate limiter with limited capacity + auto mock_limiter = std::make_shared(100000); + + ReadOptions read_opts; + + // Create first iterator + std::unique_ptr iter1; + iter1.reset(table->NewIterator( + read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr, + /*skip_filters=*/false, TableReaderCaller::kUncategorized)); + + MultiScanArgs scan_options1(BytewiseComparator()); + scan_options1.prefetch_rate_limiter = mock_limiter; + scan_options1.insert(ExtractUserKey(kv[0].first), + ExtractUserKey(kv[5 * kEntriesPerBlock].first)); + + // Create second iterator + std::unique_ptr iter2; + iter2.reset(table->NewIterator( + read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr, + /*skip_filters=*/false, TableReaderCaller::kUncategorized)); + + MultiScanArgs scan_options2(BytewiseComparator()); + scan_options2.prefetch_rate_limiter = mock_limiter; + scan_options2.insert(ExtractUserKey(kv[10 * kEntriesPerBlock].first), + ExtractUserKey(kv[15 * kEntriesPerBlock].first)); + + // Prepare both iterators - they should compete for the same rate limiter + iter1->Prepare(&scan_options1); + size_t bytes_after_first = mock_limiter->GetTotalBytesAcquired(); + + iter2->Prepare(&scan_options2); + size_t bytes_after_second = mock_limiter->GetTotalBytesAcquired(); + + // Verify both iterators used the rate limiter + ASSERT_GT(bytes_after_first, 0); + ASSERT_GT(bytes_after_second, bytes_after_first); + + // Total should not exceed the limit + ASSERT_LE(bytes_after_second, 100000); + + // Both iterators should work + iter1->Seek(kv[0].first); + ASSERT_TRUE(iter1->Valid()); + ASSERT_EQ(iter1->key().ToString(), kv[0].first); + + iter2->Seek(kv[10 * kEntriesPerBlock].first); + ASSERT_TRUE(iter2->Valid()); + ASSERT_EQ(iter2->key().ToString(), kv[10 * kEntriesPerBlock].first); +} + +TEST_F(PrefetchRateLimiterIntegrationTest, RateLimiterWithAllOrNothingMode) { + Options options; + options.statistics = CreateDBStatistics(); + size_t ts_sz = options.comparator->timestamp_size(); + std::vector> kv = + BlockBasedTableReaderBaseTest::GenerateKVMap( + 10 /* num_block */, true /* mixed_with_human_readable_string_value */, + ts_sz); + + std::string table_name = "PrefetchRateLimiterIntegrationTest_AllOrNothing"; + ImmutableOptions ioptions(options); + CreateTable(table_name, ioptions, CompressionType::kNoCompression, kv); + + std::unique_ptr table; + FileOptions foptions; + foptions.use_direct_reads = true; + InternalKeyComparator comparator(options.comparator); + NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table, + true /* prefetch_index_and_filter_in_cache */); + + // Create a rate limiter with capacity for exactly one block + auto mock_limiter = + std::make_shared(4096); // 4KB limit + + // Create MultiScanArgs requesting multiple blocks + MultiScanArgs scan_options(BytewiseComparator()); + scan_options.prefetch_rate_limiter = mock_limiter; + scan_options.insert(ExtractUserKey(kv[0].first), + ExtractUserKey(kv[5 * kEntriesPerBlock].first)); + + ReadOptions read_opts; + std::unique_ptr iter; + iter.reset(table->NewIterator( + read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr, + /*skip_filters=*/false, TableReaderCaller::kUncategorized)); + + // Call Prepare - should use all-or-nothing mode for some blocks + iter->Prepare(&scan_options); + + // Verify that the rate limiter was called + ASSERT_GT(mock_limiter->GetAcquireCalls(), 0); + ASSERT_GT(mock_limiter->GetTotalBytesRequested(), 0); + + // Should have acquired some bytes but likely not all due to all-or-nothing + // failures + ASSERT_LE(mock_limiter->GetTotalBytesAcquired(), 4096); + + // Iterator should still work + iter->Seek(kv[0].first); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), kv[0].first); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { From f9ff96d7fdf63b2cc68c9beabf064be06a4c7e4d Mon Sep 17 00:00:00 2001 From: Ryan Hancock Date: Thu, 28 Aug 2025 10:35:03 -0700 Subject: [PATCH 02/14] Nits --- include/rocksdb/options.h | 55 +++++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 26 deletions(-) diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index a0670ca3dec..97ea28c245f 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1778,29 +1778,26 @@ struct ScanOptions { class BlockBasedTable; class PrefetchRateLimiter { - public: - PrefetchRateLimiter() = default; - virtual ~PrefetchRateLimiter() = default; + public: + PrefetchRateLimiter() = default; + virtual ~PrefetchRateLimiter() = default; - virtual size_t acquire(const BlockBasedTable* table, size_t bytes, - bool all_or_nothing) = 0; - virtual bool release(size_t bytes) = 0; + virtual size_t acquire(const BlockBasedTable* table, size_t bytes, bool all_or_nothing) = 0; + virtual bool release(size_t bytes) = 0; }; -class DefaultPrefetchRateLimiter : public PrefetchRateLimiter { - public: - DefaultPrefetchRateLimiter() = default; - DefaultPrefetchRateLimiter(size_t max_bytes) - : max_bytes_(max_bytes), cur_bytes_(max_bytes) {} - virtual ~DefaultPrefetchRateLimiter() = default; - - virtual size_t acquire(const BlockBasedTable* table, size_t bytes, - bool all_or_nothing) override; - virtual bool release(size_t bytes) override; - private: - const size_t max_bytes_; - std::atomic cur_bytes_; +class DefaultPrefetchRateLimiter : public PrefetchRateLimiter { + public: + DefaultPrefetchRateLimiter() = default; + DefaultPrefetchRateLimiter(size_t max_bytes) : max_bytes_(max_bytes), cur_bytes_(max_bytes) {} + virtual ~DefaultPrefetchRateLimiter() = default; + + virtual size_t acquire(const BlockBasedTable* table, size_t bytes, bool all_or_nothing) override; + virtual bool release(size_t bytes) override; + private: + const size_t max_bytes_; + std::atomic cur_bytes_; }; // Container for multiple scan ranges that can be used with MultiScan. @@ -1810,16 +1807,18 @@ class MultiScanArgs { public: // Constructor that takes a comparator explicit MultiScanArgs(const Comparator* comparator = BytewiseComparator()) - : comp_(comparator) {} + : prefetch_rate_limiter(), comp_(comparator) {} // Copy Constructor - MultiScanArgs(const MultiScanArgs& other) { - comp_ = other.comp_; - original_ranges_ = other.original_ranges_; - io_coalesce_threshold = other.io_coalesce_threshold; - } + MultiScanArgs(const MultiScanArgs& other) + : io_coalesce_threshold(other.io_coalesce_threshold), + prefetch_rate_limiter(other.prefetch_rate_limiter), + comp_(other.comp_), + original_ranges_(other.original_ranges_) {} + MultiScanArgs(MultiScanArgs&& other) noexcept : io_coalesce_threshold(other.io_coalesce_threshold), + prefetch_rate_limiter(std::move(other.prefetch_rate_limiter)), comp_(other.comp_), original_ranges_(std::move(other.original_ranges_)) {} @@ -1827,6 +1826,7 @@ class MultiScanArgs { comp_ = other.comp_; original_ranges_ = other.original_ranges_; io_coalesce_threshold = other.io_coalesce_threshold; + prefetch_rate_limiter = other.prefetch_rate_limiter; return *this; } @@ -1835,6 +1835,7 @@ class MultiScanArgs { comp_ = other.comp_; original_ranges_ = std::move(other.original_ranges_); io_coalesce_threshold = other.io_coalesce_threshold; + prefetch_rate_limiter = other.prefetch_rate_limiter; } return *this; } @@ -1876,8 +1877,10 @@ class MultiScanArgs { uint64_t io_coalesce_threshold = 16 << 10; // 16KB by default - using RateLimiter = std::optional>; + using RateLimiter = std::optional>; + RateLimiter prefetch_rate_limiter; + PrefetchRateLimiter& GetMutablePrefetchRateLimiter() const { return *prefetch_rate_limiter.value().get(); } From a0cb28f6b46f0415bb39af41c775e80445cd3b02 Mon Sep 17 00:00:00 2001 From: Ryan Hancock Date: Thu, 28 Aug 2025 10:53:21 -0700 Subject: [PATCH 03/14] remove optional --- include/rocksdb/options.h | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 97ea28c245f..6cdde9eaacd 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1807,7 +1807,7 @@ class MultiScanArgs { public: // Constructor that takes a comparator explicit MultiScanArgs(const Comparator* comparator = BytewiseComparator()) - : prefetch_rate_limiter(), comp_(comparator) {} + : prefetch_rate_limiter(nullptr), comp_(comparator) {} // Copy Constructor MultiScanArgs(const MultiScanArgs& other) @@ -1823,10 +1823,12 @@ class MultiScanArgs { original_ranges_(std::move(other.original_ranges_)) {} MultiScanArgs& operator=(const MultiScanArgs& other) { - comp_ = other.comp_; - original_ranges_ = other.original_ranges_; - io_coalesce_threshold = other.io_coalesce_threshold; - prefetch_rate_limiter = other.prefetch_rate_limiter; + if (this != &other) { + comp_ = other.comp_; + original_ranges_ = other.original_ranges_; + io_coalesce_threshold = other.io_coalesce_threshold; + prefetch_rate_limiter = other.prefetch_rate_limiter; + } return *this; } @@ -1835,7 +1837,7 @@ class MultiScanArgs { comp_ = other.comp_; original_ranges_ = std::move(other.original_ranges_); io_coalesce_threshold = other.io_coalesce_threshold; - prefetch_rate_limiter = other.prefetch_rate_limiter; + prefetch_rate_limiter = std::move(other.prefetch_rate_limiter); } return *this; } @@ -1877,12 +1879,10 @@ class MultiScanArgs { uint64_t io_coalesce_threshold = 16 << 10; // 16KB by default - using RateLimiter = std::optional>; - - RateLimiter prefetch_rate_limiter; + std::shared_ptr prefetch_rate_limiter; PrefetchRateLimiter& GetMutablePrefetchRateLimiter() const { - return *prefetch_rate_limiter.value().get(); + return *prefetch_rate_limiter.get(); } private: From 6d494216b6905396f39801bb6916382971d615d4 Mon Sep 17 00:00:00 2001 From: Ryan Hancock Date: Thu, 28 Aug 2025 10:57:43 -0700 Subject: [PATCH 04/14] Remove default constructor --- include/rocksdb/options.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 6cdde9eaacd..5b348396579 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1789,8 +1789,7 @@ class PrefetchRateLimiter { class DefaultPrefetchRateLimiter : public PrefetchRateLimiter { public: - DefaultPrefetchRateLimiter() = default; - DefaultPrefetchRateLimiter(size_t max_bytes) : max_bytes_(max_bytes), cur_bytes_(max_bytes) {} + explicit DefaultPrefetchRateLimiter(size_t max_bytes) : max_bytes_(max_bytes), cur_bytes_(max_bytes) {} virtual ~DefaultPrefetchRateLimiter() = default; virtual size_t acquire(const BlockBasedTable* table, size_t bytes, bool all_or_nothing) override; From f785f74ded7b0205b299fae5076f9055ee8c3c32 Mon Sep 17 00:00:00 2001 From: Ryan Hancock Date: Thu, 28 Aug 2025 11:01:53 -0700 Subject: [PATCH 05/14] format --- include/rocksdb/options.h | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 5b348396579..7aad623522e 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1778,25 +1778,28 @@ struct ScanOptions { class BlockBasedTable; class PrefetchRateLimiter { - public: - PrefetchRateLimiter() = default; - virtual ~PrefetchRateLimiter() = default; + public: + PrefetchRateLimiter() = default; + virtual ~PrefetchRateLimiter() = default; - virtual size_t acquire(const BlockBasedTable* table, size_t bytes, bool all_or_nothing) = 0; - virtual bool release(size_t bytes) = 0; + virtual size_t acquire(const BlockBasedTable* table, size_t bytes, + bool all_or_nothing) = 0; + virtual bool release(size_t bytes) = 0; }; - class DefaultPrefetchRateLimiter : public PrefetchRateLimiter { - public: - explicit DefaultPrefetchRateLimiter(size_t max_bytes) : max_bytes_(max_bytes), cur_bytes_(max_bytes) {} - virtual ~DefaultPrefetchRateLimiter() = default; - - virtual size_t acquire(const BlockBasedTable* table, size_t bytes, bool all_or_nothing) override; - virtual bool release(size_t bytes) override; - private: - const size_t max_bytes_; - std::atomic cur_bytes_; + public: + explicit DefaultPrefetchRateLimiter(size_t max_bytes) + : max_bytes_(max_bytes), cur_bytes_(max_bytes) {} + virtual ~DefaultPrefetchRateLimiter() = default; + + virtual size_t acquire(const BlockBasedTable* table, size_t bytes, + bool all_or_nothing) override; + virtual bool release(size_t bytes) override; + + private: + const size_t max_bytes_; + std::atomic cur_bytes_; }; // Container for multiple scan ranges that can be used with MultiScan. @@ -1809,7 +1812,7 @@ class MultiScanArgs { : prefetch_rate_limiter(nullptr), comp_(comparator) {} // Copy Constructor - MultiScanArgs(const MultiScanArgs& other) + MultiScanArgs(const MultiScanArgs& other) : io_coalesce_threshold(other.io_coalesce_threshold), prefetch_rate_limiter(other.prefetch_rate_limiter), comp_(other.comp_), From b164f7c62318d03db0cc0ec20f156352bb62437c Mon Sep 17 00:00:00 2001 From: Ryan Hancock Date: Thu, 28 Aug 2025 11:12:38 -0700 Subject: [PATCH 06/14] Nits --- table/block_based/block_based_table_iterator.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/table/block_based/block_based_table_iterator.cc b/table/block_based/block_based_table_iterator.cc index 5003fe23f33..726b199f8f5 100644 --- a/table/block_based/block_based_table_iterator.cc +++ b/table/block_based/block_based_table_iterator.cc @@ -20,7 +20,7 @@ size_t DefaultPrefetchRateLimiter::acquire(const BlockBasedTable* /*unused*/, } while (!done) { // Check again here. - size_t current = cur_bytes_.load(std::memory_order_acquire); + size_t current = cur_bytes_.load(); if (current == 0) { amount = 0; return amount; @@ -50,7 +50,7 @@ bool DefaultPrefetchRateLimiter::release(size_t bytes) { bool done = false; while (!done) { // Check again here. - size_t current = cur_bytes_.load(std::memory_order_acq_rel); + size_t current = cur_bytes_.load(); if (current + bytes >= max_bytes_) { done = cur_bytes_.compare_exchange_weak(current, max_bytes_); } else { From fa5df871fcfc143c93f7a3b9e4553562d7e8ca08 Mon Sep 17 00:00:00 2001 From: Ryan Hancock Date: Thu, 28 Aug 2025 20:19:26 -0700 Subject: [PATCH 07/14] remove unneeded --- table/block_based/block_based_table_reader_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/table/block_based/block_based_table_reader_test.cc b/table/block_based/block_based_table_reader_test.cc index a046bf9e52e..85d8d40cc41 100644 --- a/table/block_based/block_based_table_reader_test.cc +++ b/table/block_based/block_based_table_reader_test.cc @@ -1382,7 +1382,7 @@ TEST_F(DefaultPrefetchRateLimiterTest, ThreadSafety) { // Create threads that acquire and release blocks for (int i = 0; i < num_threads; ++i) { threads.emplace_back( - [&limiter, &total_acquired, &total_released, operations_per_thread]() { + [&limiter, &total_acquired, &total_released]() { for (int j = 0; j < operations_per_thread; ++j) { // Acquire some blocks size_t acquired = limiter.acquire(nullptr, 5, false); From 677f7c9a23d9b01b0e08d41b795e2ae6b45fbce6 Mon Sep 17 00:00:00 2001 From: Ryan Hancock Date: Thu, 28 Aug 2025 20:21:59 -0700 Subject: [PATCH 08/14] format --- .../block_based_table_reader_test.cc | 29 +++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/table/block_based/block_based_table_reader_test.cc b/table/block_based/block_based_table_reader_test.cc index 85d8d40cc41..f31d8adfcb1 100644 --- a/table/block_based/block_based_table_reader_test.cc +++ b/table/block_based/block_based_table_reader_test.cc @@ -1381,22 +1381,21 @@ TEST_F(DefaultPrefetchRateLimiterTest, ThreadSafety) { // Create threads that acquire and release blocks for (int i = 0; i < num_threads; ++i) { - threads.emplace_back( - [&limiter, &total_acquired, &total_released]() { - for (int j = 0; j < operations_per_thread; ++j) { - // Acquire some blocks - size_t acquired = limiter.acquire(nullptr, 5, false); - total_acquired.fetch_add(acquired); - - // Release some blocks - if (acquired > 0) { - bool released = limiter.release(acquired); - if (released) { - total_released.fetch_add(acquired); - } - } + threads.emplace_back([&limiter, &total_acquired, &total_released]() { + for (int j = 0; j < operations_per_thread; ++j) { + // Acquire some blocks + size_t acquired = limiter.acquire(nullptr, 5, false); + total_acquired.fetch_add(acquired); + + // Release some blocks + if (acquired > 0) { + bool released = limiter.release(acquired); + if (released) { + total_released.fetch_add(acquired); } - }); + } + } + }); } // Wait for all threads to complete From 7e5e964fc76e26ccf2541cfb099fb8792eb1f3e5 Mon Sep 17 00:00:00 2001 From: Ryan Hancock Date: Tue, 2 Sep 2025 14:40:45 -0700 Subject: [PATCH 09/14] Check scan_opts as well --- table/block_based/block_based_table_iterator.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/table/block_based/block_based_table_iterator.h b/table/block_based/block_based_table_iterator.h index 1d68ad9e808..58a716353d0 100644 --- a/table/block_based/block_based_table_iterator.h +++ b/table/block_based/block_based_table_iterator.h @@ -49,7 +49,8 @@ class BlockBasedTableIterator : public InternalIteratorBase { block_iter_points_to_real_block_(false) {} ~BlockBasedTableIterator() override { - if (multi_scan_ && multi_scan_->scan_opts->prefetch_rate_limiter) { + if (multi_scan_ && multi_scan_->scan_opts && + multi_scan_->scan_opts->prefetch_rate_limiter) { multi_scan_->scan_opts->GetMutablePrefetchRateLimiter().release( total_acquired_); } From 0d2f0e2fd68095208dfb8fffcd7af7752aaa6df3 Mon Sep 17 00:00:00 2001 From: Ryan Hancock Date: Tue, 2 Sep 2025 15:39:47 -0700 Subject: [PATCH 10/14] Possibly invalid scan_opts --- .../block_based/block_based_table_iterator.cc | 5 ++--- table/block_based/block_based_table_iterator.h | 18 +++++++++++------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/table/block_based/block_based_table_iterator.cc b/table/block_based/block_based_table_iterator.cc index 726b199f8f5..3a1a3a76b55 100644 --- a/table/block_based/block_based_table_iterator.cc +++ b/table/block_based/block_based_table_iterator.cc @@ -1410,13 +1410,12 @@ void BlockBasedTableIterator::FindBlockForwardInMultiScan() { } // Move to the next pinned data block ResetDataIter(); - if (multi_scan_->scan_opts->prefetch_rate_limiter) { + if (multi_scan_->prefetch_rate_limiter) { size_t releasing = multi_scan_->pinned_data_blocks[multi_scan_->cur_data_block_idx] .GetValue() ->size(); - multi_scan_->scan_opts->GetMutablePrefetchRateLimiter().release( - releasing); + multi_scan_->prefetch_rate_limiter->release(releasing); total_acquired_ -= releasing; } ++multi_scan_->cur_data_block_idx; diff --git a/table/block_based/block_based_table_iterator.h b/table/block_based/block_based_table_iterator.h index 58a716353d0..912f758b323 100644 --- a/table/block_based/block_based_table_iterator.h +++ b/table/block_based/block_based_table_iterator.h @@ -49,13 +49,13 @@ class BlockBasedTableIterator : public InternalIteratorBase { block_iter_points_to_real_block_(false) {} ~BlockBasedTableIterator() override { - if (multi_scan_ && multi_scan_->scan_opts && - multi_scan_->scan_opts->prefetch_rate_limiter) { - multi_scan_->scan_opts->GetMutablePrefetchRateLimiter().release( - total_acquired_); - } - ClearBlockHandles(); + + // Release any acquired bytes from the rate limiter if we have a multi_scan_ + // Use the stored rate limiter copy to avoid accessing potentially invalid scan_opts + if (multi_scan_ && multi_scan_->prefetch_rate_limiter && total_acquired_ > 0) { + multi_scan_->prefetch_rate_limiter->release(total_acquired_); + } } void Seek(const Slice& target) override; @@ -395,6 +395,9 @@ class BlockBasedTableIterator : public InternalIteratorBase { std::vector> block_ranges_per_scan; size_t next_scan_idx; size_t cur_data_block_idx; + + // Store the rate limiter separately to avoid accessing potentially invalid scan_opts + std::shared_ptr prefetch_rate_limiter; MultiScanState( const MultiScanArgs* _scan_opts, @@ -404,7 +407,8 @@ class BlockBasedTableIterator : public InternalIteratorBase { pinned_data_blocks(std::move(_pinned_data_blocks)), block_ranges_per_scan(std::move(_block_ranges_per_scan)), next_scan_idx(0), - cur_data_block_idx(0) {} + cur_data_block_idx(0), + prefetch_rate_limiter(_scan_opts ? _scan_opts->prefetch_rate_limiter : nullptr) {} }; std::unique_ptr multi_scan_; From 63cdbed6727ad6e81a68c1c48c4e61d647982b73 Mon Sep 17 00:00:00 2001 From: Ryan Hancock Date: Tue, 2 Sep 2025 16:00:29 -0700 Subject: [PATCH 11/14] Format --- table/block_based/block_based_table_iterator.h | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/table/block_based/block_based_table_iterator.h b/table/block_based/block_based_table_iterator.h index 912f758b323..32ef25e9170 100644 --- a/table/block_based/block_based_table_iterator.h +++ b/table/block_based/block_based_table_iterator.h @@ -50,10 +50,12 @@ class BlockBasedTableIterator : public InternalIteratorBase { ~BlockBasedTableIterator() override { ClearBlockHandles(); - + // Release any acquired bytes from the rate limiter if we have a multi_scan_ - // Use the stored rate limiter copy to avoid accessing potentially invalid scan_opts - if (multi_scan_ && multi_scan_->prefetch_rate_limiter && total_acquired_ > 0) { + // Use the stored rate limiter copy to avoid accessing potentially invalid + // scan_opts + if (multi_scan_ && multi_scan_->prefetch_rate_limiter && + total_acquired_ > 0) { multi_scan_->prefetch_rate_limiter->release(total_acquired_); } } @@ -395,8 +397,9 @@ class BlockBasedTableIterator : public InternalIteratorBase { std::vector> block_ranges_per_scan; size_t next_scan_idx; size_t cur_data_block_idx; - - // Store the rate limiter separately to avoid accessing potentially invalid scan_opts + + // Store the rate limiter separately to avoid accessing potentially invalid + // scan_opts std::shared_ptr prefetch_rate_limiter; MultiScanState( @@ -408,7 +411,8 @@ class BlockBasedTableIterator : public InternalIteratorBase { block_ranges_per_scan(std::move(_block_ranges_per_scan)), next_scan_idx(0), cur_data_block_idx(0), - prefetch_rate_limiter(_scan_opts ? _scan_opts->prefetch_rate_limiter : nullptr) {} + prefetch_rate_limiter(_scan_opts ? _scan_opts->prefetch_rate_limiter + : nullptr) {} }; std::unique_ptr multi_scan_; From 64e9780f1a031cea9a0f274aea81d4496ba860cf Mon Sep 17 00:00:00 2001 From: Ryan Hancock Date: Tue, 2 Sep 2025 19:50:18 -0700 Subject: [PATCH 12/14] linters --- table/block_based/block_based_table_reader_test.cc | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/table/block_based/block_based_table_reader_test.cc b/table/block_based/block_based_table_reader_test.cc index f31d8adfcb1..ad398455064 100644 --- a/table/block_based/block_based_table_reader_test.cc +++ b/table/block_based/block_based_table_reader_test.cc @@ -1236,6 +1236,15 @@ class DefaultPrefetchRateLimiterTest : public testing::Test { public: DefaultPrefetchRateLimiterTest() {} ~DefaultPrefetchRateLimiterTest() override {} + + // Explicitly delete copy and move operations to satisfy Rule of Five + DefaultPrefetchRateLimiterTest(const DefaultPrefetchRateLimiterTest&) = + delete; + DefaultPrefetchRateLimiterTest& operator=( + const DefaultPrefetchRateLimiterTest&) = delete; + DefaultPrefetchRateLimiterTest(DefaultPrefetchRateLimiterTest&&) = delete; + DefaultPrefetchRateLimiterTest& operator=(DefaultPrefetchRateLimiterTest&&) = + delete; }; TEST_F(DefaultPrefetchRateLimiterTest, BasicAcquireRelease) { @@ -1251,6 +1260,7 @@ TEST_F(DefaultPrefetchRateLimiterTest, BasicAcquireRelease) { // Test acquire after partial release acquired = limiter.acquire(nullptr, 4, false); + ASSERT_EQ(acquired, 4); } @@ -1380,6 +1390,7 @@ TEST_F(DefaultPrefetchRateLimiterTest, ThreadSafety) { std::vector threads; // Create threads that acquire and release blocks + threads.reserve(num_threads); for (int i = 0; i < num_threads; ++i) { threads.emplace_back([&limiter, &total_acquired, &total_released]() { for (int j = 0; j < operations_per_thread; ++j) { @@ -1420,6 +1431,7 @@ TEST_F(DefaultPrefetchRateLimiterTest, ConcurrentAcquire) { std::vector threads; // Create threads that try to acquire blocks concurrently + threads.reserve(num_threads); for (int i = 0; i < num_threads; ++i) { threads.emplace_back([&limiter, &total_acquired]() { size_t acquired = limiter.acquire(nullptr, 10, false); @@ -1486,7 +1498,7 @@ class PrefetchRateLimiterIntegrationTest // Mock PrefetchRateLimiter to track acquire/release calls class MockPrefetchRateLimiter : public PrefetchRateLimiter { public: - MockPrefetchRateLimiter(size_t max_bytes) + explicit MockPrefetchRateLimiter(size_t max_bytes) : max_bytes_(max_bytes), cur_bytes_(max_bytes) {} size_t acquire(const BlockBasedTable* table, size_t bytes, From 3d5aee2aa187b453d8be1870994da843cce81f10 Mon Sep 17 00:00:00 2001 From: Ryan Hancock Date: Wed, 3 Sep 2025 09:37:04 -0700 Subject: [PATCH 13/14] Also make sure to have proper barriers in tests --- include/rocksdb/options.h | 2 +- .../block_based_table_reader_test.cc | 54 ++++++++++++++++--- 2 files changed, 48 insertions(+), 8 deletions(-) diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 7aad623522e..dd6be2c9efd 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1791,7 +1791,7 @@ class DefaultPrefetchRateLimiter : public PrefetchRateLimiter { public: explicit DefaultPrefetchRateLimiter(size_t max_bytes) : max_bytes_(max_bytes), cur_bytes_(max_bytes) {} - virtual ~DefaultPrefetchRateLimiter() = default; + virtual ~DefaultPrefetchRateLimiter() override = default; virtual size_t acquire(const BlockBasedTable* table, size_t bytes, bool all_or_nothing) override; diff --git a/table/block_based/block_based_table_reader_test.cc b/table/block_based/block_based_table_reader_test.cc index ad398455064..7a5ff2034e0 100644 --- a/table/block_based/block_based_table_reader_test.cc +++ b/table/block_based/block_based_table_reader_test.cc @@ -30,6 +30,34 @@ namespace ROCKSDB_NAMESPACE { +// Simple barrier implementation for thread synchronization in tests +class Barrier { + public: + explicit Barrier(int count) : count_(count), waiting_(0), generation_(0) {} + + void arrive_and_wait() { + std::unique_lock lock(mutex_); + int gen = generation_; + + if (++waiting_ == count_) { + // Last thread to arrive - wake everyone up + waiting_ = 0; + ++generation_; + cv_.notify_all(); + } else { + // Wait for all threads to arrive + cv_.wait(lock, [this, gen] { return gen != generation_; }); + } + } + + private: + const int count_; + int waiting_; + int generation_; + std::mutex mutex_; + std::condition_variable cv_; +}; + class BlockBasedTableReaderBaseTest : public testing::Test { public: static constexpr int kBytesPerEntry = 256; @@ -1386,13 +1414,17 @@ TEST_F(DefaultPrefetchRateLimiterTest, ThreadSafety) { const int num_threads = 10; const int operations_per_thread = 10000; + Barrier barrier(num_threads); std::vector threads; // Create threads that acquire and release blocks threads.reserve(num_threads); for (int i = 0; i < num_threads; ++i) { - threads.emplace_back([&limiter, &total_acquired, &total_released]() { + threads.emplace_back([&barrier, &limiter, &total_acquired, &total_released, + operations_per_thread]() { + barrier.arrive_and_wait(); + for (int j = 0; j < operations_per_thread; ++j) { // Acquire some blocks size_t acquired = limiter.acquire(nullptr, 5, false); @@ -1453,18 +1485,26 @@ TEST_F(DefaultPrefetchRateLimiterTest, ConcurrentAcquire) { // Test concurrent release operations TEST_F(DefaultPrefetchRateLimiterTest, ConcurrentRelease) { - DefaultPrefetchRateLimiter limiter(50); + DefaultPrefetchRateLimiter limiter(100000); // First acquire all blocks - size_t acquired = limiter.acquire(nullptr, 50, false); - ASSERT_EQ(acquired, 50); + size_t acquired = limiter.acquire(nullptr, 100000, false); + ASSERT_EQ(acquired, 100000); const int num_threads = 10; std::vector threads; + Barrier barrier(num_threads); // Create threads that try to release blocks concurrently + threads.reserve(num_threads); for (int i = 0; i < num_threads; ++i) { - threads.emplace_back([&limiter]() { limiter.release(10); }); + threads.emplace_back([&barrier, &limiter]() { + barrier.arrive_and_wait(); + + for (int t = 0; t < 10000; ++t) { + limiter.release(1); + } + }); } // Wait for all threads to complete @@ -1473,8 +1513,8 @@ TEST_F(DefaultPrefetchRateLimiterTest, ConcurrentRelease) { } // Should be able to acquire the maximum again (releases should be capped) - acquired = limiter.acquire(nullptr, 60, false); - ASSERT_EQ(acquired, 50); // Should be capped at max_blocks_ + acquired = limiter.acquire(nullptr, 100000, false); + ASSERT_EQ(acquired, 100000); // Should be capped at max_blocks_ } // Integration tests for PrefetchRateLimiter with MultiScan and Prepare From 396b0d0a88cfd0de5a620b240da177208aaa5621 Mon Sep 17 00:00:00 2001 From: Ryan Hancock Date: Wed, 3 Sep 2025 09:44:32 -0700 Subject: [PATCH 14/14] linter --- .../block_based_table_reader_test.cc | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/table/block_based/block_based_table_reader_test.cc b/table/block_based/block_based_table_reader_test.cc index 7a5ff2034e0..52dfb2bd88f 100644 --- a/table/block_based/block_based_table_reader_test.cc +++ b/table/block_based/block_based_table_reader_test.cc @@ -1421,24 +1421,24 @@ TEST_F(DefaultPrefetchRateLimiterTest, ThreadSafety) { // Create threads that acquire and release blocks threads.reserve(num_threads); for (int i = 0; i < num_threads; ++i) { - threads.emplace_back([&barrier, &limiter, &total_acquired, &total_released, - operations_per_thread]() { - barrier.arrive_and_wait(); - - for (int j = 0; j < operations_per_thread; ++j) { - // Acquire some blocks - size_t acquired = limiter.acquire(nullptr, 5, false); - total_acquired.fetch_add(acquired); - - // Release some blocks - if (acquired > 0) { - bool released = limiter.release(acquired); - if (released) { - total_released.fetch_add(acquired); + threads.emplace_back( + [&barrier, &limiter, &total_acquired, &total_released]() { + barrier.arrive_and_wait(); + + for (int j = 0; j < operations_per_thread; ++j) { + // Acquire some blocks + size_t acquired = limiter.acquire(nullptr, 5, false); + total_acquired.fetch_add(acquired); + + // Release some blocks + if (acquired > 0) { + bool released = limiter.release(acquired); + if (released) { + total_released.fetch_add(acquired); + } + } } - } - } - }); + }); } // Wait for all threads to complete