From 7dfc00fed47e157221414ff59ada15527b59f2cd Mon Sep 17 00:00:00 2001 From: jinhelin Date: Fri, 17 Nov 2023 20:29:18 +0800 Subject: [PATCH] Check RU in read thread of Storage. (#8386) close pingcap/tiflash#8362 --- dbms/src/Common/TiFlashMetrics.h | 3 + .../ColumnFile/ColumnFileSetReader.cpp | 23 ++++-- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 12 ++- .../DeltaMerge/ReadThread/MergedTask.cpp | 11 ++- .../ReadThread/SegmentReadTaskScheduler.cpp | 40 ++++++++-- dbms/src/Storages/DeltaMerge/Segment.h | 7 ++ .../DeltaMerge/SegmentReadTaskPool.cpp | 75 ++++++++++++++++++- .../Storages/DeltaMerge/SegmentReadTaskPool.h | 15 +++- .../DeltaMerge/SkippableBlockInputStream.h | 5 +- .../Storages/StorageDisaggregatedRemote.cpp | 5 +- 10 files changed, 172 insertions(+), 24 deletions(-) diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index c47880d25a7..b4e109b665b 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -502,9 +502,12 @@ namespace DB Counter, \ F(type_sche_no_pool, {"type", "sche_no_pool"}), \ F(type_sche_no_slot, {"type", "sche_no_slot"}), \ + F(type_sche_no_ru, {"type", "sche_no_ru"}), \ F(type_sche_no_segment, {"type", "sche_no_segment"}), \ + F(type_sche_active_segment_limit, {"type", "sche_active_segment_limit"}), \ F(type_sche_from_cache, {"type", "sche_from_cache"}), \ F(type_sche_new_task, {"type", "sche_new_task"}), \ + F(type_ru_exhausted, {"type", "ru_exhausted"}), \ F(type_add_cache_succ, {"type", "add_cache_succ"}), \ F(type_add_cache_stale, {"type", "add_cache_stale"}), \ F(type_get_cache_miss, {"type", "get_cache_miss"}), \ diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp index b916c9e554c..9c7fe38f7eb 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp @@ -131,6 +131,14 @@ Block ColumnFileSetReader::readPKVersion(size_t offset, size_t limit) return block; } +static Int64 columnsSize(MutableColumns & columns) +{ + Int64 bytes = 0; + for (const auto & col : columns) + bytes += col->byteSize(); + return bytes; +} + size_t ColumnFileSetReader::readRows( MutableColumns & output_columns, size_t offset, @@ -156,6 +164,7 @@ size_t ColumnFileSetReader::readRows( if (end == start) return 0; + auto bytes_before_read = columnsSize(output_columns); auto [start_file_index, rows_start_in_start_file] = locatePosByAccumulation(column_file_rows_end, start); auto [end_file_index, rows_end_in_end_file] = locatePosByAccumulation(column_file_rows_end, end); @@ -187,13 +196,13 @@ size_t ColumnFileSetReader::readRows( } } - UInt64 delta_bytes = 0; - for (const auto & col : output_columns) - delta_bytes += col->byteSize(); - - lac_bytes_collector.collect(delta_bytes); - if (likely(context.scan_context)) - context.scan_context->total_user_read_bytes += delta_bytes; + if (auto delta_bytes = columnsSize(output_columns) - bytes_before_read; delta_bytes > 0) + { + if (row_ids == nullptr) + lac_bytes_collector.collect(delta_bytes); + if (likely(context.scan_context)) + context.scan_context->total_user_read_bytes += delta_bytes; + } return actual_read; } diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index cef50a6033b..851a58cc498 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -960,7 +960,8 @@ BlockInputStreams DeltaMergeStore::readRaw( after_segment_read, req_info, enable_read_thread, - final_num_stream); + final_num_stream, + dm_context->scan_context->resource_group_name); BlockInputStreams res; for (size_t i = 0; i < final_num_stream; ++i) @@ -1062,7 +1063,8 @@ void DeltaMergeStore::readRaw( after_segment_read, req_info, enable_read_thread, - final_num_stream); + final_num_stream, + dm_context->scan_context->resource_group_name); if (enable_read_thread) { @@ -1196,7 +1198,8 @@ BlockInputStreams DeltaMergeStore::read( after_segment_read, log_tracing_id, enable_read_thread, - final_num_stream); + final_num_stream, + dm_context->scan_context->resource_group_name); BlockInputStreams res; for (size_t i = 0; i < final_num_stream; ++i) @@ -1299,7 +1302,8 @@ void DeltaMergeStore::read( after_segment_read, log_tracing_id, enable_read_thread, - final_num_stream); + final_num_stream, + dm_context->scan_context->resource_group_name); if (enable_read_thread) { diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.cpp index 44653b1fedf..5264f729d36 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.cpp @@ -47,6 +47,10 @@ void MergedTask::initOnce() setStreamFinished(cur_idx); continue; } + if (pool->isRUExhausted()) + { + continue; + } stream = pool->buildInputStream(task); fiu_do_on(FailPoints::exception_in_merged_task_init, { throw Exception("Fail point exception_in_merged_task_init is triggered.", ErrorCodes::FAIL_POINT_ERROR); @@ -74,11 +78,16 @@ int MergedTask::readOneBlock() continue; } - if (pool->getFreeBlockSlots() <= 0) + if (pool->getFreeBlockSlots() <= 0 || pool->isRUExhausted()) { continue; } + if (stream == nullptr) + { + stream = pool->buildInputStream(task); + } + if (pool->readOneBlock(stream, task->segment)) { read_block_count++; diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp index 26787d40d77..52d85e0d27f 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp @@ -123,9 +123,39 @@ SegmentReadTaskPools SegmentReadTaskScheduler::getPoolsUnlock(const std::vector< bool SegmentReadTaskScheduler::needScheduleToRead(const SegmentReadTaskPoolPtr & pool) { - return pool->getFreeBlockSlots() > 0 && // Block queue is not full and - (merged_task_pool.has(pool->pool_id) || // can schedule a segment from MergedTaskPool or - pool->getFreeActiveSegments() > 0); // schedule a new segment. + if (pool->getFreeBlockSlots() <= 0) + { + GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_slot).Increment(); + return false; + } + + if (pool->isRUExhausted()) + { + GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_ru).Increment(); + return false; + } + + // Check if there are segments that can be scheduled: + // 1. There are already activated segments. + if (merged_task_pool.has(pool->pool_id)) + { + return true; + } + // 2. Not reach limitation, we can activate a segment. + if (pool->getFreeActiveSegments() > 0 && pool->getPendingSegmentCount() > 0) + { + return true; + } + + if (pool->getFreeActiveSegments() <= 0) + { + GET_METRIC(tiflash_storage_read_thread_counter, type_sche_active_segment_limit).Increment(); + } + else + { + GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_segment).Increment(); + } + return false; } SegmentReadTaskPoolPtr SegmentReadTaskScheduler::scheduleSegmentReadTaskPoolUnlock() @@ -145,10 +175,6 @@ SegmentReadTaskPoolPtr SegmentReadTaskScheduler::scheduleSegmentReadTaskPoolUnlo { GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_pool).Increment(); } - else - { - GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_slot).Increment(); - } return nullptr; } diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index f73eedf5ad1..e11e7b19bf1 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -69,6 +69,13 @@ struct SegmentSnapshot : private boost::noncopyable UInt64 getRows() const { return delta->getRows() + stable->getRows(); } bool isForUpdate() const { return delta->isForUpdate(); } + + UInt64 estimatedBytesOfInternalColumns() const + { + // TODO: how about cluster index? + // handle + version + flag + return (sizeof(Int64) + sizeof(UInt64) + sizeof(UInt8)) * getRows(); + } }; /// A segment contains many rows of a table. A table is split into segments by consecutive ranges. diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp index e2896761746..aad55f97426 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp @@ -169,6 +169,11 @@ BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t auto block_size = std::max( expected_block_size, static_cast(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows)); + if (likely(read_mode == ReadMode::Bitmap && !res_group_name.empty())) + { + auto bytes = t->read_snapshot->estimatedBytesOfInternalColumns(); + LocalAdmissionController::global_instance->consumeResource(res_group_name, bytesToRU(bytes), 0); + } stream = t->segment->getInputStream( read_mode, *dm_context, @@ -201,7 +206,8 @@ SegmentReadTaskPool::SegmentReadTaskPool( AfterSegmentRead after_segment_read_, const String & tracing_id, bool enable_read_thread_, - Int64 num_streams_) + Int64 num_streams_, + const String & res_group_name_) : pool_id(nextPoolId()) , physical_table_id(physical_table_id_) , mem_tracker(current_memory_tracker == nullptr ? nullptr : current_memory_tracker->shared_from_this()) @@ -224,6 +230,7 @@ SegmentReadTaskPool::SegmentReadTaskPool( // Limiting the minimum number of reading segments to 2 is to avoid, as much as possible, // situations where the computation may be faster and the storage layer may not be able to keep up. , active_segment_limit(std::max(num_streams_, 2)) + , res_group_name(res_group_name_) { if (tasks_wrapper.empty()) { @@ -355,6 +362,7 @@ void SegmentReadTaskPool::pushBlock(Block && block) { blk_stat.push(block); global_blk_stat.push(block); + read_bytes_after_last_check += block.bytes(); q.push(std::move(block), nullptr); } @@ -383,6 +391,12 @@ Int64 SegmentReadTaskPool::getFreeActiveSegmentsUnlock() const return active_segment_limit - static_cast(active_segment_ids.size()); } +Int64 SegmentReadTaskPool::getPendingSegmentCount() const +{ + std::lock_guard lock(mutex); + return tasks_wrapper.getTasks().size(); +} + bool SegmentReadTaskPool::exceptionHappened() const { return exception_happened.load(std::memory_order_relaxed); @@ -403,4 +417,63 @@ void SegmentReadTaskPool::setException(const DB::Exception & e) } } +static Int64 currentMS() +{ + return std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()) + .count(); +} + +static bool checkIsRUExhausted(const String & res_group_name) +{ + auto priority = LocalAdmissionController::global_instance->getPriority(res_group_name); + if (unlikely(!priority.has_value())) + { + return false; + } + return LocalAdmissionController::isRUExhausted(*priority); +} + +bool SegmentReadTaskPool::isRUExhausted() +{ + auto res = isRUExhaustedImpl(); + if (res) + { + GET_METRIC(tiflash_storage_read_thread_counter, type_ru_exhausted).Increment(); + } + return res; +} + +bool SegmentReadTaskPool::isRUExhaustedImpl() +{ + if (unlikely(res_group_name.empty() || LocalAdmissionController::global_instance == nullptr)) + { + return false; + } + + // To reduce lock contention in resource control, + // check if RU is exhuasted every `bytes_of_one_hundred_ru` or every `100ms`. + + // Fast path. + Int64 ms = currentMS(); + if (read_bytes_after_last_check < bytes_of_one_hundred_ru && ms - last_time_check_ru < check_ru_interval_ms) + { + return ru_is_exhausted; // Return result of last time. + } + + std::lock_guard lock(ru_mu); + // If last thread has check is ru exhausted, use the result of last thread. + // Attention: `read_bytes_after_last_check` can be written concurrently in `pushBlock`. + ms = currentMS(); + if (read_bytes_after_last_check < bytes_of_one_hundred_ru && ms - last_time_check_ru < check_ru_interval_ms) + { + return ru_is_exhausted; // Return result of last time. + } + + // Check and reset everything. + read_bytes_after_last_check = 0; + ru_is_exhausted = checkIsRUExhausted(res_group_name); + last_time_check_ru = ms; + return ru_is_exhausted; +} + } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index a723a25000e..18bd66e7481 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -167,7 +167,8 @@ class SegmentReadTaskPool : private boost::noncopyable AfterSegmentRead after_segment_read_, const String & tracing_id, bool enable_read_thread_, - Int64 num_streams_); + Int64 num_streams_, + const String & res_group_name_); ~SegmentReadTaskPool() { @@ -216,6 +217,7 @@ class SegmentReadTaskPool : private boost::noncopyable Int64 decreaseUnorderedInputStreamRefCount(); Int64 getFreeBlockSlots() const; Int64 getFreeActiveSegments() const; + Int64 getPendingSegmentCount() const; bool valid() const; void setException(const DB::Exception & e); @@ -245,12 +247,16 @@ class SegmentReadTaskPool : private boost::noncopyable } } + bool isRUExhausted(); + private: Int64 getFreeActiveSegmentsUnlock() const; bool exceptionHappened() const; void finishSegment(const SegmentPtr & seg); void pushBlock(Block && block); + bool isRUExhaustedImpl(); + const int extra_table_id_index; DMContextPtr dm_context; ColumnDefines columns_to_read; @@ -280,9 +286,16 @@ class SegmentReadTaskPool : private boost::noncopyable const Int64 block_slot_limit; const Int64 active_segment_limit; + const String res_group_name; + std::mutex ru_mu; + std::atomic last_time_check_ru = 0; + std::atomic ru_is_exhausted = false; + std::atomic read_bytes_after_last_check = 0; + inline static std::atomic pool_id_gen{1}; inline static BlockStat global_blk_stat; static uint64_t nextPoolId() { return pool_id_gen.fetch_add(1, std::memory_order_relaxed); } + inline static constexpr Int64 check_ru_interval_ms = 100; }; using SegmentReadTaskPoolPtr = std::shared_ptr; diff --git a/dbms/src/Storages/DeltaMerge/SkippableBlockInputStream.h b/dbms/src/Storages/DeltaMerge/SkippableBlockInputStream.h index 47713ed3884..bab6d5db832 100644 --- a/dbms/src/Storages/DeltaMerge/SkippableBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/SkippableBlockInputStream.h @@ -221,7 +221,10 @@ class ConcatSkippableBlockInputStream : public SkippableBlockInputStream if (likely(scan_context != nullptr)) { scan_context->total_user_read_bytes += bytes; - lac_bytes_collector.collect(bytes); + if constexpr (!need_row_id) + { + lac_bytes_collector.collect(bytes); + } } } BlockInputStreams::iterator current_stream; diff --git a/dbms/src/Storages/StorageDisaggregatedRemote.cpp b/dbms/src/Storages/StorageDisaggregatedRemote.cpp index bb98cb73cd2..4c7fafaddb5 100644 --- a/dbms/src/Storages/StorageDisaggregatedRemote.cpp +++ b/dbms/src/Storages/StorageDisaggregatedRemote.cpp @@ -122,8 +122,9 @@ DM::Remote::RNReadTaskPtr StorageDisaggregated::buildReadTaskWithBackoff(const C { using namespace pingcap; - auto scan_context = std::make_shared(); - context.getDAGContext()->scan_context_map[table_scan.getTableScanExecutorID()] = scan_context; + auto * dag_context = context.getDAGContext(); + auto scan_context = std::make_shared(dag_context->getResourceGroupName()); + dag_context->scan_context_map[table_scan.getTableScanExecutorID()] = scan_context; DM::Remote::RNReadTaskPtr read_task;